Add consolidation proposal for cascading logic duplication

Detailed design for extracting build_cascading_ctes(),
collect_permission_rules(), and build_restriction_filter() to
replace three separate implementations with one shared SQL builder.
Includes migration plan and handles the include_is_private
complication.

https://claude.ai/code/session_013EkyroQKPhcjdMbpHc9g4X
This commit is contained in:
Claude 2026-02-06 01:52:08 +00:00
commit 1fa23f4a42
No known key found for this signature in database

View file

@ -335,3 +335,393 @@ Refactor to avoid duplicating the entire CTE chain when restrictions are present
### P4: Add deny reason to permission check logging (Issue 12)
Return `(allowed, reason)` tuples from `check_permission_for_resource()`.
---
## Proposal: Consolidating the Cascading Logic (Issues 2 + 3)
### The problem
The cascading logic ("child > parent > global; deny beats allow at each level") is implemented three times in two files:
| # | Function | File | Used by | SQL pattern |
|---|----------|------|---------|-------------|
| 1 | `_build_single_action_sql()` | `actions_sql.py:246-384` | `allowed_resources()` (production) | 3 separate CTEs (`child_lvl`, `parent_lvl`, `global_lvl`) with `LEFT JOIN` + `GROUP BY` + `MAX()`, then CASE cascade |
| 2 | `check_permission_for_resource()` | `actions_sql.py:555-587` | `allowed()` (production) | `ROW_NUMBER() OVER (PARTITION BY ... ORDER BY depth DESC, ...)` + `LIMIT 1` |
| 3 | `resolve_permissions_from_catalog()` | `permissions.py:197-391` | Tests only | Same `ROW_NUMBER()` as #2, but with the entire CTE chain **tripled** when restrictions are present |
These three implementations must all agree on the resolution semantics. A logic change (e.g., adding a priority tier) would need to be replicated in all three places.
### Why three exist
Each serves a different purpose with different requirements:
- **#1 (bulk resources)**: Needs to evaluate every `(parent, child)` in the `base` CTE. Can't use `ROW_NUMBER()` as easily because it needs the per-resource aggregates available for the `include_is_private` anonymous pass too. Outputs `reason` as JSON array and `is_private`.
- **#2 (single resource)**: Only checks one `(parent, child)`. Much simpler — just filter matching rules, rank, pick winner. Returns boolean.
- **#3 (test utility)**: Returns full resolution details (allow, reason, source_plugin, depth) for every candidate. Used in tests to verify the cascading logic itself.
### Proposed design: One SQL builder, three callers
Introduce a single function `build_cascading_ctes()` that generates the shared CTE fragment, then each caller wraps it with its own `SELECT` and extras.
#### Step 1: Extract `build_rules_union_from_permission_sqls()`
Both production paths (#1 and #2) already have nearly identical code to iterate over `PermissionSQL` objects, collect params, collect `restriction_sqls`, and build the UNION ALL. Factor this into a single shared function:
```python
# In actions_sql.py (or a new shared module)
@dataclass
class CollectedRules:
"""Result of collecting PermissionSQL objects into SQL fragments."""
rules_union: str # UNION ALL of all rule SELECTs
params: dict[str, Any] # All collected params
restriction_sqls: list[str] # restriction_sql fragments
def collect_permission_rules(
permission_sqls: list[PermissionSQL],
) -> CollectedRules | None:
"""
Iterate PermissionSQL objects, build the UNION ALL, collect params
and restriction_sqls. Returns None if no rule SQL was found.
"""
rule_parts = []
all_params = {}
restriction_sqls = []
for i, psql in enumerate(permission_sqls):
all_params.update(psql.params or {})
if psql.restriction_sql:
restriction_sqls.append(psql.restriction_sql)
if psql.sql is None:
continue
# Parameterize source_plugin instead of interpolating (fixes Issue 5)
source_key = f"_src_{i}"
all_params[source_key] = psql.source
rule_parts.append(
f"SELECT parent, child, allow, reason, :{source_key} AS source_plugin"
f" FROM ({psql.sql})"
)
if not rule_parts:
return None
return CollectedRules(
rules_union=" UNION ALL ".join(rule_parts),
params=all_params,
restriction_sqls=restriction_sqls,
)
```
This already fixes **Issue 5** (`source_plugin` injection) as a side effect.
#### Step 2: Extract `build_cascading_ctes()`
The core cascading logic — given a `base` CTE and an `all_rules` CTE, produce a `decisions` CTE — can be expressed as a single function that returns CTE SQL fragments:
```python
def build_cascading_ctes(
*,
rules_alias: str = "all_rules",
base_alias: str = "base",
include_reasons: bool = False,
) -> str:
"""
Return CTE SQL for child_lvl, parent_lvl, global_lvl, decisions.
Expects the caller to already have defined CTEs named `base_alias`
(with columns: parent, child) and `rules_alias` (with columns:
parent, child, allow, reason, source_plugin).
The output `decisions` CTE has columns:
parent, child, is_allowed, reason
Where `reason` is either a json_group_array (include_reasons=True)
or the single winning reason text.
"""
# The three level CTEs
level_ctes = []
for level_name, join_condition in [
("child_lvl", f"ar.parent = b.parent AND ar.child = b.child"),
("parent_lvl", f"ar.parent = b.parent AND ar.child IS NULL"),
("global_lvl", f"ar.parent IS NULL AND ar.child IS NULL"),
]:
reason_cols = ""
if include_reasons:
reason_cols = (
",\n json_group_array(CASE WHEN ar.allow = 0 "
"THEN ar.source_plugin || ': ' || ar.reason END) AS deny_reasons"
",\n json_group_array(CASE WHEN ar.allow = 1 "
"THEN ar.source_plugin || ': ' || ar.reason END) AS allow_reasons"
)
level_ctes.append(f"""{level_name} AS (
SELECT b.parent, b.child,
MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny,
MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow{reason_cols}
FROM {base_alias} b
LEFT JOIN {rules_alias} ar ON {join_condition}
GROUP BY b.parent, b.child
)""")
# The decisions CTE
if include_reasons:
reason_case = """
CASE
WHEN cl.any_deny = 1 THEN cl.deny_reasons
WHEN cl.any_allow = 1 THEN cl.allow_reasons
WHEN pl.any_deny = 1 THEN pl.deny_reasons
WHEN pl.any_allow = 1 THEN pl.allow_reasons
WHEN gl.any_deny = 1 THEN gl.deny_reasons
WHEN gl.any_allow = 1 THEN gl.allow_reasons
ELSE '[]'
END AS reason"""
else:
reason_case = "'implicit deny' AS reason" # simple placeholder
null_safe_join = (
"b.parent = {a}.parent AND "
"(b.child = {a}.child OR (b.child IS NULL AND {a}.child IS NULL))"
)
decisions_cte = f"""decisions AS (
SELECT
b.parent, b.child,
CASE
WHEN cl.any_deny = 1 THEN 0
WHEN cl.any_allow = 1 THEN 1
WHEN pl.any_deny = 1 THEN 0
WHEN pl.any_allow = 1 THEN 1
WHEN gl.any_deny = 1 THEN 0
WHEN gl.any_allow = 1 THEN 1
ELSE 0
END AS is_allowed,
{reason_case}
FROM {base_alias} b
JOIN child_lvl cl ON {null_safe_join.format(a='cl')}
JOIN parent_lvl pl ON {null_safe_join.format(a='pl')}
JOIN global_lvl gl ON {null_safe_join.format(a='gl')}
)"""
return ",\n".join(level_ctes) + ",\n" + decisions_cte
```
#### Step 3: Extract `build_restriction_filter()`
Restriction handling is also duplicated. A single function can generate the restriction CTE and WHERE clause:
```python
def build_restriction_filter(restriction_sqls: list[str]) -> tuple[str, str]:
"""
Returns (cte_sql, where_clause) for restriction filtering.
cte_sql: ", restriction_list AS (...)" to append to WITH block
where_clause: "AND EXISTS (...)" to append to WHERE
"""
restriction_intersect = "\nINTERSECT\n".join(
f"SELECT * FROM ({sql})" for sql in restriction_sqls
)
cte_sql = f",\nrestriction_list AS (\n {restriction_intersect}\n)"
where_clause = """
AND EXISTS (
SELECT 1 FROM restriction_list r
WHERE (r.parent = decisions.parent OR r.parent IS NULL)
AND (r.child = decisions.child OR r.child IS NULL)
)"""
return cte_sql, where_clause
```
#### Step 4: Rewrite the three callers
**`_build_single_action_sql()` (bulk resources)**
```python
async def _build_single_action_sql(datasette, actor, action, *, parent=None,
include_is_private=False):
action_obj = datasette.actions.get(action)
base_resources_sql = await action_obj.resource_class.resources_sql(datasette)
permission_sqls = await gather_permission_sql_from_hooks(...)
if permission_sqls is SKIP_PERMISSION_CHECKS:
return ... # early return unchanged
collected = collect_permission_rules(permission_sqls)
if collected is None:
return ... # empty result unchanged
all_params = collected.params
# Build WITH clause
cte_parts = [
f"WITH\nbase AS (\n {base_resources_sql}\n)",
f"all_rules AS (\n {collected.rules_union}\n)",
]
# Anonymous rules for is_private (if needed)
if include_is_private:
anon_collected = ... # same anon logic as before, but using collect_permission_rules
cte_parts.append(f"anon_rules AS (\n {anon_collected.rules_union}\n)")
# Core cascading logic — ONE call
cte_parts.append(build_cascading_ctes(include_reasons=True))
if include_is_private:
cte_parts.append(build_cascading_ctes(
rules_alias="anon_rules",
base_alias="base",
# Use different CTE names to avoid collision:
# This variant would need a prefix parameter, e.g. prefix="anon_"
))
# ... or simpler: call a second time with aliased names
# Restriction filter
restriction_cte = ""
restriction_where = ""
if collected.restriction_sqls:
restriction_cte, restriction_where = build_restriction_filter(
collected.restriction_sqls
)
# Final SELECT
select_cols = "parent, child, reason"
if include_is_private:
select_cols += ", is_private"
query = (
",\n".join(cte_parts) + restriction_cte +
f"\nSELECT {select_cols}\nFROM decisions\nWHERE is_allowed = 1"
+ restriction_where
+ (f"\n AND parent = :filter_parent" if parent else "")
+ "\nORDER BY parent, child"
)
return query, all_params
```
**`check_permission_for_resource()` (single resource)**
This can now be rewritten to use the same `build_cascading_ctes()` with a single-row `base`:
```python
async def check_permission_for_resource(*, datasette, actor, action, parent, child):
rules_union, all_params, restriction_sqls = await build_permission_rules_sql(
datasette, actor, action
)
if not rules_union:
return False
all_params["_check_parent"] = parent
all_params["_check_child"] = child
# Check restrictions first (unchanged fast-path)
if restriction_sqls:
... # existing restriction check, unchanged
# Use the shared cascading logic with a single-row base
base_sql = "SELECT :_check_parent AS parent, :_check_child AS child"
cascade = build_cascading_ctes()
query = f"""
WITH
base AS ({base_sql}),
all_rules AS ({rules_union}),
{cascade}
SELECT COALESCE((SELECT is_allowed FROM decisions), 0) AS is_allowed
"""
result = await datasette.get_internal_database().execute(query, all_params)
return bool(result.rows[0][0]) if result.rows else False
```
This replaces the current depth/ROW_NUMBER approach with the same `child_lvl`/`parent_lvl`/`global_lvl` pattern, ensuring identical semantics.
**`resolve_permissions_from_catalog()` (test utility)**
This becomes a thin wrapper too. Since it's test-only, the main benefit is eliminating 250 lines of duplicated SQL:
```python
async def resolve_permissions_from_catalog(db, actor, plugins, action,
candidate_sql, candidate_params=None,
*, implicit_deny=True):
# Resolve plugins (existing code, unchanged)
resolved_plugins, restriction_sqls = ...
union_sql, rule_params = build_rules_union(actor, resolved_plugins)
all_params = {**(candidate_params or {}), **rule_params, "action": action}
cascade = build_cascading_ctes(
include_reasons=True,
base_alias="cands",
rules_alias="rules",
)
# One query, no duplication
restriction_cte = ""
restriction_where = ""
if restriction_sqls:
restriction_cte, restriction_where = build_restriction_filter(restriction_sqls)
sql = f"""
WITH
cands AS ({candidate_sql}),
rules AS ({union_sql}),
{cascade}
{restriction_cte}
SELECT
c.parent, c.child,
COALESCE(d.is_allowed, CASE WHEN :implicit_deny THEN 0 ELSE NULL END) AS allow,
d.reason, :action AS action,
...
FROM cands c
LEFT JOIN decisions d ON c.parent = d.parent AND c.child = d.child
{restriction_where}
ORDER BY c.parent, c.child
"""
rows = await db.execute(sql, {**all_params, "implicit_deny": ...})
return [dict(r) for r in rows]
```
This eliminates the 135-line SQL triplication entirely.
### The `include_is_private` complication
The `include_is_private` path is the one wrinkle. It needs to run the cascading logic twice: once for the real actor and once for anonymous. The current code duplicates all three level CTEs with `anon_` prefixes.
With the shared builder, we'd need `build_cascading_ctes()` to accept a `prefix` parameter so it can generate `anon_child_lvl`, `anon_parent_lvl`, etc.:
```python
def build_cascading_ctes(*, rules_alias="all_rules", base_alias="base",
include_reasons=False, prefix=""):
# Use prefix for all CTE names:
# f"{prefix}child_lvl", f"{prefix}parent_lvl", etc.
```
Then the caller does:
```python
ctes = build_cascading_ctes(include_reasons=True) # -> child_lvl, decisions
ctes += build_cascading_ctes(rules_alias="anon_rules", # -> anon_child_lvl, anon_decisions
prefix="anon_",
include_reasons=False)
```
And the final SELECT joins both `decisions` and `anon_decisions`.
### Impact summary
| Before | After |
|--------|-------|
| `_build_single_action_sql`: ~180 lines of CTE construction | ~40 lines + shared builder |
| `check_permission_for_resource`: ~35 lines of cascading SQL | ~10 lines + shared builder |
| `resolve_permissions_from_catalog`: ~250 lines, SQL tripled when restrictions present | ~30 lines + shared builder |
| `source_plugin` interpolated unsafely in 3 places | Parameterized in `collect_permission_rules()` |
| `build_rules_union` in `permissions.py` (test-only duplicate) | Replaced by shared `collect_permission_rules()` |
Total: ~465 lines of SQL-building code reduced to ~80 lines of callers + ~80 lines of shared builders. Three implementations of cascading logic become one.
### Migration plan
1. Add `collect_permission_rules()` and `build_cascading_ctes()` and `build_restriction_filter()` to `actions_sql.py` (or a new `datasette/utils/permission_sql_builder.py`)
2. Rewrite `check_permission_for_resource()` to use the shared builder
3. Rewrite `_build_single_action_sql()` to use the shared builder (including `include_is_private` prefix support)
4. Rewrite `resolve_permissions_from_catalog()` to use the shared builder
5. Delete `build_rules_union()` from `permissions.py`
6. Run the full test suite — all 263 tests must still pass since behavior is unchanged
7. Verify via `?_trace=1` that the generated SQL is correct and equivalently performant