Ensure :actor, :actor_id and :action are all available to permissions SQL, closes #2520

- Updated build_rules_union() to accept actor as dict and provide :actor (JSON) and :actor_id
- Updated resolve_permissions_from_catalog() and resolve_permissions_with_candidates() to accept actor dict
- :actor is now the full actor dict as JSON (use json_extract() to access fields)
- :actor_id is the actor's id field for simple comparisons
- :action continues to be available as before
- Updated all call sites and tests to use new parameter format
- Added test demonstrating all three parameters working together
This commit is contained in:
Simon Willison 2025-10-23 09:48:55 -07:00
commit 65c427e4ee
3 changed files with 105 additions and 26 deletions

View file

@ -1,6 +1,7 @@
# perm_utils.py
from __future__ import annotations
import json
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Union
import sqlite3
@ -36,17 +37,19 @@ PluginOrFactory = Union[PermissionSQL, PluginProvider]
def build_rules_union(
actor: str, plugins: Sequence[PermissionSQL]
actor: Optional[dict], plugins: Sequence[PermissionSQL]
) -> Tuple[str, Dict[str, Any]]:
"""
Compose plugin SQL into a UNION ALL with namespaced parameters.
Returns:
union_sql: a SELECT with columns (parent, child, allow, reason, source_plugin)
params: dict of bound parameters including :actor and namespaced plugin params
params: dict of bound parameters including :actor (JSON), :actor_id, and namespaced plugin params
"""
parts: List[str] = []
params: Dict[str, Any] = {"actor": actor}
actor_json = json.dumps(actor) if actor else None
actor_id = actor.get("id") if actor else None
params: Dict[str, Any] = {"actor": actor_json, "actor_id": actor_id}
for i, p in enumerate(plugins):
rewrite, ns_params = _namespace_params(i, p.params)
@ -77,7 +80,7 @@ def build_rules_union(
async def resolve_permissions_from_catalog(
db,
actor: str,
actor: Optional[dict],
plugins: Sequence[PluginOrFactory],
action: str,
candidate_sql: str,
@ -95,6 +98,7 @@ async def resolve_permissions_from_catalog(
where rows is an iterable of sqlite3.Row
- plugins are either PermissionSQL objects or callables accepting (action: str)
and returning PermissionSQL instances selecting (parent, child, allow, reason)
- actor is the actor dict (or None), made available as :actor (JSON), :actor_id, and :action
Decision policy:
1) Specificity first: child (depth=2) > parent (depth=1) > root (depth=0)
@ -121,7 +125,6 @@ async def resolve_permissions_from_catalog(
all_params = {
**(candidate_params or {}),
**rule_params,
"actor": actor,
"action": action,
}
@ -191,7 +194,7 @@ async def resolve_permissions_from_catalog(
async def resolve_permissions_with_candidates(
db,
actor: str,
actor: Optional[dict],
plugins: Sequence[PluginOrFactory],
candidates: List[Tuple[str, Optional[str]]],
action: str,
@ -203,6 +206,7 @@ async def resolve_permissions_with_candidates(
the candidates as a UNION of parameterized SELECTs in a CTE.
candidates: list of (parent, child) where child can be None for parent-scoped actions.
actor: actor dict (or None), made available as :actor (JSON), :actor_id, and :action
"""
# Build a small CTE for candidates.
cand_rows_sql: List[str] = []

View file

@ -312,10 +312,9 @@ class AllowedResourcesView(BaseView):
continue
plugins.append(candidate)
actor_id = actor.get("id") if actor else None
rows = await resolve_permissions_from_catalog(
db,
actor=str(actor_id) if actor_id is not None else "",
actor=actor,
plugins=plugins,
action=action,
candidate_sql=candidate_sql,