From c9429466cd7588a90b41fc4eb1152f1be01fdac0 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 6 Feb 2026 02:10:36 +0000 Subject: [PATCH] Remove test-only cascading logic implementation, fix params bug Three changes: 1. Rewrite test_utils_permissions.py to exercise production code paths (allowed_resources / allowed) instead of the test-only resolve_permissions_from_catalog function. Tests now register plugins via ds.pm.register and call the real Datasette methods. 2. Remove resolve_permissions_from_catalog, resolve_permissions_with_candidates, and build_rules_union from datasette/utils/permissions.py. These were only used by tests and implemented the cascading logic a third time, independently of the two production implementations. 3. Fix bug in gather_permission_sql_from_hooks where empty params dicts ({}) would cause framework-injected params (:actor_id, :actor, :action) to be silently lost. The expression `params = permission_sql.params or {}` creates a new dict when params is {} (falsy), so setdefault writes to a throwaway dict. Fixed by explicitly checking `is None`. https://claude.ai/code/session_013EkyroQKPhcjdMbpHc9g4X --- datasette/utils/permissions.py | 367 +--------- tests/test_utils_permissions.py | 1120 ++++++++++++++++--------------- 2 files changed, 580 insertions(+), 907 deletions(-) diff --git a/datasette/utils/permissions.py b/datasette/utils/permissions.py index 6c30a12a..4f4858d0 100644 --- a/datasette/utils/permissions.py +++ b/datasette/utils/permissions.py @@ -2,8 +2,7 @@ from __future__ import annotations import json -from typing import Any, Dict, Iterable, List, Sequence, Tuple -import sqlite3 +from typing import Any, Iterable, List from datasette.permissions import PermissionSQL from datasette.plugins import pm @@ -46,10 +45,11 @@ async def gather_permission_sql_from_hooks( for permission_sql in _iter_permission_sql_from_result(resolved, action=action): if not permission_sql.source: permission_sql.source = default_source - params = permission_sql.params or {} - params.setdefault("action", action) - params.setdefault("actor", actor_json) - params.setdefault("actor_id", actor_id) + if permission_sql.params is None: + permission_sql.params = {} + permission_sql.params.setdefault("action", action) + permission_sql.params.setdefault("actor", actor_json) + permission_sql.params.setdefault("actor_id", actor_id) collected.append(permission_sql) return collected @@ -82,358 +82,3 @@ def _iter_permission_sql_from_result( raise TypeError( "Plugin providers must return PermissionSQL instances, sequences, or callables" ) - - -# ----------------------------- -# Plugin interface & utilities -# ----------------------------- - - -def build_rules_union( - actor: dict | None, plugins: Sequence[PermissionSQL] -) -> Tuple[str, Dict[str, Any]]: - """ - Compose plugin SQL into a UNION ALL. - - Returns: - union_sql: a SELECT with columns (parent, child, allow, reason, source_plugin) - params: dict of bound parameters including :actor (JSON), :actor_id, and plugin params - - Note: Plugins are responsible for ensuring their parameter names don't conflict. - The system reserves these parameter names: :actor, :actor_id, :action, :filter_parent - Plugin parameters should be prefixed with a unique identifier (e.g., source name). - """ - parts: List[str] = [] - actor_json = json.dumps(actor) if actor else None - actor_id = actor.get("id") if actor else None - params: Dict[str, Any] = {"actor": actor_json, "actor_id": actor_id} - - for p in plugins: - # No namespacing - just use plugin params as-is - params.update(p.params or {}) - - # Skip plugins that only provide restriction_sql (no permission rules) - if p.sql is None: - continue - - parts.append( - f""" - SELECT parent, child, allow, reason, '{p.source}' AS source_plugin FROM ( - {p.sql} - ) - """.strip() - ) - - if not parts: - # Empty UNION that returns no rows - union_sql = "SELECT NULL parent, NULL child, NULL allow, NULL reason, 'none' source_plugin WHERE 0" - else: - union_sql = "\nUNION ALL\n".join(parts) - - return union_sql, params - - -# ----------------------------------------------- -# Core resolvers (no temp tables, no custom UDFs) -# ----------------------------------------------- - - -async def resolve_permissions_from_catalog( - db, - actor: dict | None, - plugins: Sequence[Any], - action: str, - candidate_sql: str, - candidate_params: Dict[str, Any] | None = None, - *, - implicit_deny: bool = True, -) -> List[Dict[str, Any]]: - """ - Resolve permissions by embedding the provided *candidate_sql* in a CTE. - - Expectations: - - candidate_sql SELECTs: parent TEXT, child TEXT - (Use child=NULL for parent-scoped actions like "execute-sql".) - - *db* exposes: rows = await db.execute(sql, params) - where rows is an iterable of sqlite3.Row - - plugins: hook results handled by await_me_maybe - can be sync/async, - single PermissionSQL, list, or callable returning PermissionSQL - - actor is the actor dict (or None), made available as :actor (JSON), :actor_id, and :action - - Decision policy: - 1) Specificity first: child (depth=2) > parent (depth=1) > root (depth=0) - 2) Within the same depth: deny (0) beats allow (1) - 3) If no matching rule: - - implicit_deny=True -> treat as allow=0, reason='implicit deny' - - implicit_deny=False -> allow=None, reason=None - - Returns: list of dict rows - - parent, child, allow, reason, source_plugin, depth - - resource (rendered "/parent/child" or "/parent" or "/") - """ - resolved_plugins: List[PermissionSQL] = [] - restriction_sqls: List[str] = [] - - for plugin in plugins: - if callable(plugin) and not isinstance(plugin, PermissionSQL): - resolved = plugin(action) # type: ignore[arg-type] - else: - resolved = plugin # type: ignore[assignment] - if not isinstance(resolved, PermissionSQL): - raise TypeError("Plugin providers must return PermissionSQL instances") - resolved_plugins.append(resolved) - - # Collect restriction SQL filters - if resolved.restriction_sql: - restriction_sqls.append(resolved.restriction_sql) - - union_sql, rule_params = build_rules_union(actor, resolved_plugins) - all_params = { - **(candidate_params or {}), - **rule_params, - "action": action, - } - - sql = f""" - WITH - cands AS ( - {candidate_sql} - ), - rules AS ( - {union_sql} - ), - matched AS ( - SELECT - c.parent, c.child, - r.allow, r.reason, r.source_plugin, - CASE - WHEN r.child IS NOT NULL THEN 2 -- child-level (most specific) - WHEN r.parent IS NOT NULL THEN 1 -- parent-level - ELSE 0 -- root/global - END AS depth - FROM cands c - JOIN rules r - ON (r.parent IS NULL OR r.parent = c.parent) - AND (r.child IS NULL OR r.child = c.child) - ), - ranked AS ( - SELECT *, - ROW_NUMBER() OVER ( - PARTITION BY parent, child - ORDER BY - depth DESC, -- specificity first - CASE WHEN allow=0 THEN 0 ELSE 1 END, -- then deny over allow at same depth - source_plugin -- stable tie-break - ) AS rn - FROM matched - ), - winner AS ( - SELECT parent, child, - allow, reason, source_plugin, depth - FROM ranked WHERE rn = 1 - ) - SELECT - c.parent, c.child, - COALESCE(w.allow, CASE WHEN :implicit_deny THEN 0 ELSE NULL END) AS allow, - COALESCE(w.reason, CASE WHEN :implicit_deny THEN 'implicit deny' ELSE NULL END) AS reason, - w.source_plugin, - COALESCE(w.depth, -1) AS depth, - :action AS action, - CASE - WHEN c.parent IS NULL THEN '/' - WHEN c.child IS NULL THEN '/' || c.parent - ELSE '/' || c.parent || '/' || c.child - END AS resource - FROM cands c - LEFT JOIN winner w - ON ((w.parent = c.parent) OR (w.parent IS NULL AND c.parent IS NULL)) - AND ((w.child = c.child ) OR (w.child IS NULL AND c.child IS NULL)) - ORDER BY c.parent, c.child - """ - - # If there are restriction filters, wrap the query with INTERSECT - # This ensures only resources in the restriction allowlist are returned - if restriction_sqls: - # Start with the main query, but select only parent/child for the INTERSECT - main_query_for_intersect = f""" - WITH - cands AS ( - {candidate_sql} - ), - rules AS ( - {union_sql} - ), - matched AS ( - SELECT - c.parent, c.child, - r.allow, r.reason, r.source_plugin, - CASE - WHEN r.child IS NOT NULL THEN 2 -- child-level (most specific) - WHEN r.parent IS NOT NULL THEN 1 -- parent-level - ELSE 0 -- root/global - END AS depth - FROM cands c - JOIN rules r - ON (r.parent IS NULL OR r.parent = c.parent) - AND (r.child IS NULL OR r.child = c.child) - ), - ranked AS ( - SELECT *, - ROW_NUMBER() OVER ( - PARTITION BY parent, child - ORDER BY - depth DESC, -- specificity first - CASE WHEN allow=0 THEN 0 ELSE 1 END, -- then deny over allow at same depth - source_plugin -- stable tie-break - ) AS rn - FROM matched - ), - winner AS ( - SELECT parent, child, - allow, reason, source_plugin, depth - FROM ranked WHERE rn = 1 - ), - permitted_resources AS ( - SELECT c.parent, c.child - FROM cands c - LEFT JOIN winner w - ON ((w.parent = c.parent) OR (w.parent IS NULL AND c.parent IS NULL)) - AND ((w.child = c.child ) OR (w.child IS NULL AND c.child IS NULL)) - WHERE COALESCE(w.allow, CASE WHEN :implicit_deny THEN 0 ELSE NULL END) = 1 - ) - SELECT parent, child FROM permitted_resources - """ - - # Build restriction list with INTERSECT (all must match) - # Then filter to resources that match hierarchically - # Wrap each restriction_sql in a subquery to avoid operator precedence issues - # with UNION ALL inside the restriction SQL statements - restriction_intersect = "\nINTERSECT\n".join( - f"SELECT * FROM ({sql})" for sql in restriction_sqls - ) - - # Combine: resources allowed by permissions AND in restriction allowlist - # Database-level restrictions (parent, NULL) should match all children (parent, *) - filtered_resources = f""" - WITH restriction_list AS ( - {restriction_intersect} - ), - permitted AS ( - {main_query_for_intersect} - ), - filtered AS ( - SELECT p.parent, p.child - FROM permitted p - WHERE EXISTS ( - SELECT 1 FROM restriction_list r - WHERE (r.parent = p.parent OR r.parent IS NULL) - AND (r.child = p.child OR r.child IS NULL) - ) - ) - """ - - # Now join back to get full results for only the filtered resources - sql = f""" - {filtered_resources} - , cands AS ( - {candidate_sql} - ), - rules AS ( - {union_sql} - ), - matched AS ( - SELECT - c.parent, c.child, - r.allow, r.reason, r.source_plugin, - CASE - WHEN r.child IS NOT NULL THEN 2 -- child-level (most specific) - WHEN r.parent IS NOT NULL THEN 1 -- parent-level - ELSE 0 -- root/global - END AS depth - FROM cands c - JOIN rules r - ON (r.parent IS NULL OR r.parent = c.parent) - AND (r.child IS NULL OR r.child = c.child) - ), - ranked AS ( - SELECT *, - ROW_NUMBER() OVER ( - PARTITION BY parent, child - ORDER BY - depth DESC, -- specificity first - CASE WHEN allow=0 THEN 0 ELSE 1 END, -- then deny over allow at same depth - source_plugin -- stable tie-break - ) AS rn - FROM matched - ), - winner AS ( - SELECT parent, child, - allow, reason, source_plugin, depth - FROM ranked WHERE rn = 1 - ) - SELECT - c.parent, c.child, - COALESCE(w.allow, CASE WHEN :implicit_deny THEN 0 ELSE NULL END) AS allow, - COALESCE(w.reason, CASE WHEN :implicit_deny THEN 'implicit deny' ELSE NULL END) AS reason, - w.source_plugin, - COALESCE(w.depth, -1) AS depth, - :action AS action, - CASE - WHEN c.parent IS NULL THEN '/' - WHEN c.child IS NULL THEN '/' || c.parent - ELSE '/' || c.parent || '/' || c.child - END AS resource - FROM filtered c - LEFT JOIN winner w - ON ((w.parent = c.parent) OR (w.parent IS NULL AND c.parent IS NULL)) - AND ((w.child = c.child ) OR (w.child IS NULL AND c.child IS NULL)) - ORDER BY c.parent, c.child - """ - - rows_iter: Iterable[sqlite3.Row] = await db.execute( - sql, - {**all_params, "implicit_deny": 1 if implicit_deny else 0}, - ) - return [dict(r) for r in rows_iter] - - -async def resolve_permissions_with_candidates( - db, - actor: dict | None, - plugins: Sequence[Any], - candidates: List[Tuple[str, str | None]], - action: str, - *, - implicit_deny: bool = True, -) -> List[Dict[str, Any]]: - """ - Resolve permissions without any external candidate table by embedding - the candidates as a UNION of parameterized SELECTs in a CTE. - - candidates: list of (parent, child) where child can be None for parent-scoped actions. - actor: actor dict (or None), made available as :actor (JSON), :actor_id, and :action - """ - # Build a small CTE for candidates. - cand_rows_sql: List[str] = [] - cand_params: Dict[str, Any] = {} - for i, (parent, child) in enumerate(candidates): - pkey = f"cand_p_{i}" - ckey = f"cand_c_{i}" - cand_params[pkey] = parent - cand_params[ckey] = child - cand_rows_sql.append(f"SELECT :{pkey} AS parent, :{ckey} AS child") - candidate_sql = ( - "\nUNION ALL\n".join(cand_rows_sql) - if cand_rows_sql - else "SELECT NULL AS parent, NULL AS child WHERE 0" - ) - - return await resolve_permissions_from_catalog( - db, - actor, - plugins, - action, - candidate_sql=candidate_sql, - candidate_params=cand_params, - implicit_deny=implicit_deny, - ) diff --git a/tests/test_utils_permissions.py b/tests/test_utils_permissions.py index b412de0f..8105e8f4 100644 --- a/tests/test_utils_permissions.py +++ b/tests/test_utils_permissions.py @@ -1,612 +1,640 @@ +""" +Tests for cascading permission resolution logic. + +These tests verify the core cascading semantics through the production +code paths (allowed_resources / allowed) rather than through a separate +test-only SQL builder. Every test registers a lightweight plugin via +``ds.pm.register`` and calls the real ``Datasette.allowed_resources()`` +and/or ``Datasette.allowed()`` methods. + +Cascading semantics tested: + 1. child (depth 2) > parent (depth 1) > global (depth 0) + 2. DENY beats ALLOW at the same depth + 3. No matching rule → implicit deny + 4. Multiple plugins can contribute rules with independent parameters + 5. :actor, :actor_id, :action are available in SQL +""" + import pytest +import pytest_asyncio from datasette.app import Datasette from datasette.permissions import PermissionSQL -from datasette.utils.permissions import resolve_permissions_from_catalog -from typing import Callable, List +from datasette.resources import TableResource, DatabaseResource +from datasette import hookimpl -@pytest.fixture -def db(): - ds = Datasette() - import tempfile - from datasette.database import Database +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- - path = tempfile.mktemp(suffix="demo.db") - db = ds.add_database(Database(ds, path=path)) - return db +class PermissionRulesPlugin: + """Thin shim that delegates to a callback for permission_resources_sql.""" + + def __init__(self, rules_callback): + self.rules_callback = rules_callback + + @hookimpl + def permission_resources_sql(self, datasette, actor, action): + return self.rules_callback(datasette, actor, action) -NO_RULES_SQL = ( - "SELECT NULL AS parent, NULL AS child, NULL AS allow, NULL AS reason WHERE 0" -) +@pytest_asyncio.fixture +async def ds(): + """ + Create a Datasette instance with catalog tables that mirror the + original test_utils_permissions layout: + + databases: perm_accounting, perm_hr, perm_analytics + tables per db: table01..table10 + special tables: perm_accounting/sales, perm_analytics/secret + + Uses default_deny=True so that only the test-registered plugins + determine permission outcomes (no built-in default-allow rules). + + Database names are prefixed with ``perm_`` to avoid collisions with + other test fixtures that create memory databases in the same process. + """ + instance = Datasette(default_deny=True) + await instance.invoke_startup() + + per_parent = 10 + parents = ["perm_accounting", "perm_hr", "perm_analytics"] + specials = {"perm_accounting": ["sales"], "perm_analytics": ["secret"], "perm_hr": []} + + for parent in parents: + db = instance.add_memory_database(parent) + base_tables = [f"table{i:02d}" for i in range(1, per_parent + 1)] + for s in specials.get(parent, []): + if s not in base_tables: + base_tables[0] = s + for tbl in base_tables: + await db.execute_write( + f"CREATE TABLE IF NOT EXISTS [{tbl}] (id INTEGER PRIMARY KEY)" + ) + + await instance._refresh_schemas() + yield instance + # Cleanup: remove databases to avoid polluting other tests + for parent in parents: + instance.remove_database(parent) -def plugin_allow_all_for_user(user: str) -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: +# --------------------------------------------------------------------------- +# Plugin factories — return callables suitable for PermissionRulesPlugin +# --------------------------------------------------------------------------- + +def _cb_allow_all_for_user(user): + """Global allow for a specific user.""" + def cb(datasette, actor, action): + if not actor or actor.get("id") != user: + return None return PermissionSQL( - """ - SELECT NULL AS parent, NULL AS child, 1 AS allow, - 'global allow for ' || :allow_all_user || ' on ' || :allow_all_action AS reason - WHERE :actor_id = :allow_all_user - """, - {"allow_all_user": user, "allow_all_action": action}, + sql=( + "SELECT NULL AS parent, NULL AS child, 1 AS allow, " + "'global allow for ' || :_aau_user || ' on ' || :action AS reason" + ), + params={"_aau_user": user}, + ) + return cb + + +def _cb_deny_specific_table(user, parent, child): + """Child-level deny for a specific user + table.""" + def cb(datasette, actor, action): + if not actor or actor.get("id") != user: + return None + return PermissionSQL( + sql=( + "SELECT :_dst_parent AS parent, :_dst_child AS child, 0 AS allow, " + "'deny ' || :_dst_parent || '/' || :_dst_child || ' for ' || :_dst_user AS reason" + ), + params={"_dst_parent": parent, "_dst_child": child, "_dst_user": user}, + ) + return cb + + +def _cb_org_policy_deny_parent(parent): + """Unconditional parent-level deny (applies to all actors).""" + def cb(datasette, actor, action): + return PermissionSQL( + sql=( + "SELECT :_opd_parent AS parent, NULL AS child, 0 AS allow, " + "'org policy: deny ' || :_opd_parent AS reason" + ), + params={"_opd_parent": parent}, + ) + return cb + + +def _cb_allow_parent_for_user(user, parent): + """Parent-level allow for a specific user.""" + def cb(datasette, actor, action): + if not actor or actor.get("id") != user: + return None + return PermissionSQL( + sql=( + "SELECT :_apu_parent AS parent, NULL AS child, 1 AS allow, " + "'allow parent ' || :_apu_parent || ' for ' || :_apu_user AS reason" + ), + params={"_apu_parent": parent, "_apu_user": user}, + ) + return cb + + +def _cb_child_allow_for_user(user, parent, child): + """Child-level allow for a specific user.""" + def cb(datasette, actor, action): + if not actor or actor.get("id") != user: + return None + return PermissionSQL( + sql=( + "SELECT :_cau_parent AS parent, :_cau_child AS child, 1 AS allow, " + "'allow child ' || :_cau_parent || '/' || :_cau_child || ' for ' || :_cau_user AS reason" + ), + params={"_cau_parent": parent, "_cau_child": child, "_cau_user": user}, + ) + return cb + + +def _cb_root_deny_for_all(): + """Unconditional global deny.""" + def cb(datasette, actor, action): + return PermissionSQL( + sql=( + "SELECT NULL AS parent, NULL AS child, 0 AS allow, " + "'root deny for all' AS reason" + ), + ) + return cb + + +def _cb_conflicting_same_child_rules(user, parent, child): + """Two plugins: one allow + one deny at the same child level.""" + def cb_allow(datasette, actor, action): + if not actor or actor.get("id") != user: + return None + return PermissionSQL( + sql=( + "SELECT :_csca_parent AS parent, :_csca_child AS child, 1 AS allow, " + "'team grant at child' AS reason" + ), + params={"_csca_parent": parent, "_csca_child": child}, ) - return provider - - -def plugin_deny_specific_table( - user: str, parent: str, child: str -) -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: + def cb_deny(datasette, actor, action): + if not actor or actor.get("id") != user: + return None return PermissionSQL( - """ - SELECT :deny_specific_table_parent AS parent, :deny_specific_table_child AS child, 0 AS allow, - 'deny ' || :deny_specific_table_parent || '/' || :deny_specific_table_child || ' for ' || :deny_specific_table_user || ' on ' || :deny_specific_table_action AS reason - WHERE :actor_id = :deny_specific_table_user - """, - { - "deny_specific_table_parent": parent, - "deny_specific_table_child": child, - "deny_specific_table_user": user, - "deny_specific_table_action": action, - }, + sql=( + "SELECT :_cscd_parent AS parent, :_cscd_child AS child, 0 AS allow, " + "'exception deny at child' AS reason" + ), + params={"_cscd_parent": parent, "_cscd_child": child}, ) - return provider + return cb_allow, cb_deny -def plugin_org_policy_deny_parent(parent: str) -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: - return PermissionSQL( - """ - SELECT :org_policy_parent_deny_parent AS parent, NULL AS child, 0 AS allow, - 'org policy: parent ' || :org_policy_parent_deny_parent || ' denied on ' || :org_policy_parent_deny_action AS reason - """, - { - "org_policy_parent_deny_parent": parent, - "org_policy_parent_deny_action": action, - }, - ) - - return provider - - -def plugin_allow_parent_for_user( - user: str, parent: str -) -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: - return PermissionSQL( - """ - SELECT :allow_parent_parent AS parent, NULL AS child, 1 AS allow, - 'allow full parent for ' || :allow_parent_user || ' on ' || :allow_parent_action AS reason - WHERE :actor_id = :allow_parent_user - """, - { - "allow_parent_parent": parent, - "allow_parent_user": user, - "allow_parent_action": action, - }, - ) - - return provider - - -def plugin_child_allow_for_user( - user: str, parent: str, child: str -) -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: - return PermissionSQL( - """ - SELECT :allow_child_parent AS parent, :allow_child_child AS child, 1 AS allow, - 'allow child for ' || :allow_child_user || ' on ' || :allow_child_action AS reason - WHERE :actor_id = :allow_child_user - """, - { - "allow_child_parent": parent, - "allow_child_child": child, - "allow_child_user": user, - "allow_child_action": action, - }, - ) - - return provider - - -def plugin_root_deny_for_all() -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: - return PermissionSQL( - """ - SELECT NULL AS parent, NULL AS child, 0 AS allow, 'root deny for all on ' || :root_deny_action AS reason - """, - {"root_deny_action": action}, - ) - - return provider - - -def plugin_conflicting_same_child_rules( - user: str, parent: str, child: str -) -> List[Callable[[str], PermissionSQL]]: - def allow_provider(action: str) -> PermissionSQL: - return PermissionSQL( - """ - SELECT :conflict_child_allow_parent AS parent, :conflict_child_allow_child AS child, 1 AS allow, - 'team grant at child for ' || :conflict_child_allow_user || ' on ' || :conflict_child_allow_action AS reason - WHERE :actor_id = :conflict_child_allow_user - """, - { - "conflict_child_allow_parent": parent, - "conflict_child_allow_child": child, - "conflict_child_allow_user": user, - "conflict_child_allow_action": action, - }, - ) - - def deny_provider(action: str) -> PermissionSQL: - return PermissionSQL( - """ - SELECT :conflict_child_deny_parent AS parent, :conflict_child_deny_child AS child, 0 AS allow, - 'exception deny at child for ' || :conflict_child_deny_user || ' on ' || :conflict_child_deny_action AS reason - WHERE :actor_id = :conflict_child_deny_user - """, - { - "conflict_child_deny_parent": parent, - "conflict_child_deny_child": child, - "conflict_child_deny_user": user, - "conflict_child_deny_action": action, - }, - ) - - return [allow_provider, deny_provider] - - -def plugin_allow_all_for_action( - user: str, allowed_action: str -) -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: +def _cb_allow_all_for_action(user, allowed_action): + """Global allow for a specific user on a specific action only.""" + def cb(datasette, actor, action): if action != allowed_action: - return PermissionSQL(NO_RULES_SQL) - # Sanitize parameter names by replacing hyphens with underscores - param_prefix = action.replace("-", "_") + return None + if not actor or actor.get("id") != user: + return None return PermissionSQL( - f""" - SELECT NULL AS parent, NULL AS child, 1 AS allow, - 'global allow for ' || :{param_prefix}_user || ' on ' || :{param_prefix}_action AS reason - WHERE :actor_id = :{param_prefix}_user - """, - {f"{param_prefix}_user": user, f"{param_prefix}_action": action}, + sql=( + "SELECT NULL AS parent, NULL AS child, 1 AS allow, " + "'global allow for ' || :_aafa_user || ' on ' || :action AS reason" + ), + params={"_aafa_user": user}, ) + return cb - return provider +# --------------------------------------------------------------------------- +# Helpers for asserting results +# --------------------------------------------------------------------------- + +def _allowed_set(resources): + """Convert PaginatedResources.resources to {(parent, child), ...}.""" + return {(r.parent, r.child) for r in resources} + + +def _allowed_set_for_parent(resources, parent): + return {(r.parent, r.child) for r in resources if r.parent == parent} + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- VIEW_TABLE = "view-table" -# ---------- Catalog DDL (from your schema) ---------- -CATALOG_DDL = """ -CREATE TABLE IF NOT EXISTS catalog_databases ( - database_name TEXT PRIMARY KEY, - path TEXT, - is_memory INTEGER, - schema_version INTEGER -); -CREATE TABLE IF NOT EXISTS catalog_tables ( - database_name TEXT, - table_name TEXT, - rootpage INTEGER, - sql TEXT, - PRIMARY KEY (database_name, table_name), - FOREIGN KEY (database_name) REFERENCES catalog_databases(database_name) -); -""" - -PARENTS = ["accounting", "hr", "analytics"] -SPECIALS = {"accounting": ["sales"], "analytics": ["secret"], "hr": []} - -TABLE_CANDIDATES_SQL = ( - "SELECT database_name AS parent, table_name AS child FROM catalog_tables" -) -PARENT_CANDIDATES_SQL = ( - "SELECT database_name AS parent, NULL AS child FROM catalog_databases" -) - - -# ---------- Helpers ---------- -async def seed_catalog(db, per_parent: int = 10) -> None: - await db.execute_write_script(CATALOG_DDL) - # databases - db_rows = [(p, f"/{p}.db", 0, 1) for p in PARENTS] - await db.execute_write_many( - "INSERT OR REPLACE INTO catalog_databases(database_name, path, is_memory, schema_version) VALUES (?,?,?,?)", - db_rows, - ) - - # tables - def tables_for(parent: str, n: int): - base = [f"table{i:02d}" for i in range(1, n + 1)] - for s in SPECIALS.get(parent, []): - if s not in base: - base[0] = s - return base - - table_rows = [] - for p in PARENTS: - for t in tables_for(p, per_parent): - table_rows.append((p, t, 0, f"CREATE TABLE {t} (id INTEGER PRIMARY KEY)")) - await db.execute_write_many( - "INSERT OR REPLACE INTO catalog_tables(database_name, table_name, rootpage, sql) VALUES (?,?,?,?)", - table_rows, - ) - - -def res_allowed(rows, parent=None): - return sorted( - r["resource"] - for r in rows - if r["allow"] == 1 and (parent is None or r["parent"] == parent) - ) - - -def res_denied(rows, parent=None): - return sorted( - r["resource"] - for r in rows - if r["allow"] == 0 and (parent is None or r["parent"] == parent) - ) - - -# ---------- Tests ---------- @pytest.mark.asyncio -async def test_alice_global_allow_with_specific_denies_catalog(db): - await seed_catalog(db) - plugins = [ - plugin_allow_all_for_user("alice"), - plugin_deny_specific_table("alice", "accounting", "sales"), - plugin_org_policy_deny_parent("hr"), - ] - rows = await resolve_permissions_from_catalog( - db, - {"id": "alice"}, - plugins, - VIEW_TABLE, - TABLE_CANDIDATES_SQL, - implicit_deny=True, - ) - # Alice can see everything except accounting/sales and hr/* - assert "/accounting/sales" in res_denied(rows) - for r in rows: - if r["parent"] == "hr": - assert r["allow"] == 0 - elif r["resource"] == "/accounting/sales": - assert r["allow"] == 0 - else: - assert r["allow"] == 1 - - -@pytest.mark.asyncio -async def test_carol_parent_allow_but_child_conflict_deny_wins_catalog(db): - await seed_catalog(db) - plugins = [ - plugin_org_policy_deny_parent("hr"), - plugin_allow_parent_for_user("carol", "analytics"), - *plugin_conflicting_same_child_rules("carol", "analytics", "secret"), - ] - rows = await resolve_permissions_from_catalog( - db, - {"id": "carol"}, - plugins, - VIEW_TABLE, - TABLE_CANDIDATES_SQL, - implicit_deny=True, - ) - allowed_analytics = res_allowed(rows, parent="analytics") - denied_analytics = res_denied(rows, parent="analytics") - - assert "/analytics/secret" in denied_analytics - # 10 analytics children total, 1 denied - assert len(allowed_analytics) == 9 - - -@pytest.mark.asyncio -async def test_specificity_child_allow_overrides_parent_deny_catalog(db): - await seed_catalog(db) - plugins = [ - plugin_allow_all_for_user("alice"), - plugin_org_policy_deny_parent("analytics"), # parent-level deny - plugin_child_allow_for_user( - "alice", "analytics", "table02" - ), # child allow beats parent deny - ] - rows = await resolve_permissions_from_catalog( - db, - {"id": "alice"}, - plugins, - VIEW_TABLE, - TABLE_CANDIDATES_SQL, - implicit_deny=True, - ) - - # table02 allowed, other analytics tables denied - assert any(r["resource"] == "/analytics/table02" and r["allow"] == 1 for r in rows) - assert all( - (r["parent"] != "analytics" or r["child"] == "table02" or r["allow"] == 0) - for r in rows - ) - - -@pytest.mark.asyncio -async def test_root_deny_all_but_parent_allow_rescues_specific_parent_catalog(db): - await seed_catalog(db) - plugins = [ - plugin_root_deny_for_all(), # root deny - plugin_allow_parent_for_user( - "bob", "accounting" - ), # parent allow (more specific) - ] - rows = await resolve_permissions_from_catalog( - db, {"id": "bob"}, plugins, VIEW_TABLE, TABLE_CANDIDATES_SQL, implicit_deny=True - ) - for r in rows: - if r["parent"] == "accounting": - assert r["allow"] == 1 - else: - assert r["allow"] == 0 - - -@pytest.mark.asyncio -async def test_parent_scoped_candidates(db): - await seed_catalog(db) - plugins = [ - plugin_org_policy_deny_parent("hr"), - plugin_allow_parent_for_user("carol", "analytics"), - ] - rows = await resolve_permissions_from_catalog( - db, - {"id": "carol"}, - plugins, - VIEW_TABLE, - PARENT_CANDIDATES_SQL, - implicit_deny=True, - ) - d = {r["resource"]: r["allow"] for r in rows} - assert d["/analytics"] == 1 - assert d["/hr"] == 0 - - -@pytest.mark.asyncio -async def test_implicit_deny_behavior(db): - await seed_catalog(db) - plugins = [] # no rules at all - - # implicit_deny=True -> everything denied with reason 'implicit deny' - rows = await resolve_permissions_from_catalog( - db, - {"id": "erin"}, - plugins, - VIEW_TABLE, - TABLE_CANDIDATES_SQL, - implicit_deny=True, - ) - assert all(r["allow"] == 0 and r["reason"] == "implicit deny" for r in rows) - - # implicit_deny=False -> no winner => allow is None, reason is None - rows2 = await resolve_permissions_from_catalog( - db, - {"id": "erin"}, - plugins, - VIEW_TABLE, - TABLE_CANDIDATES_SQL, - implicit_deny=False, - ) - assert all(r["allow"] is None and r["reason"] is None for r in rows2) - - -@pytest.mark.asyncio -async def test_candidate_filters_via_params(db): - await seed_catalog(db) - # Add some metadata to test filtering - # Mark 'hr' as is_memory=1 and increment analytics schema_version - await db.execute_write( - "UPDATE catalog_databases SET is_memory=1 WHERE database_name='hr'" - ) - await db.execute_write( - "UPDATE catalog_databases SET schema_version=2 WHERE database_name='analytics'" - ) - - # Candidate SQL that filters by db metadata via params - candidate_sql = """ - SELECT t.database_name AS parent, t.table_name AS child - FROM catalog_tables t - JOIN catalog_databases d ON d.database_name = t.database_name - WHERE (:exclude_memory = 1 AND d.is_memory = 1) IS NOT 1 - AND (:min_schema_version IS NULL OR d.schema_version >= :min_schema_version) +async def test_alice_global_allow_with_specific_denies(ds): """ + Alice has global allow, but: + - accounting/sales is denied at child level + - hr/* is denied at parent level + She should see everything except those. + """ + # Combine three plugin callbacks into one that returns a list + deny_table_cb = _cb_deny_specific_table("alice", "perm_accounting", "sales") + deny_parent_cb = _cb_org_policy_deny_parent("perm_hr") + allow_cb = _cb_allow_all_for_user("alice") - plugins = [ - plugin_root_deny_for_all(), - plugin_allow_parent_for_user( - "dev", "analytics" - ), # analytics rescued if included by candidates - ] + def combined(datasette, actor, action): + results = [] + for cb in (allow_cb, deny_table_cb, deny_parent_cb): + r = cb(datasette, actor, action) + if r is not None: + results.append(r) + return results - # Case 1: exclude memory dbs, require schema_version >= 2 -> only analytics appear, and thus are allowed - rows = await resolve_permissions_from_catalog( - db, - {"id": "dev"}, - plugins, - VIEW_TABLE, - candidate_sql, - candidate_params={"exclude_memory": 1, "min_schema_version": 2}, - implicit_deny=True, - ) - assert rows and all(r["parent"] == "analytics" for r in rows) - assert all(r["allow"] == 1 for r in rows) + plugin = PermissionRulesPlugin(combined) + ds.pm.register(plugin, name="test_plugin") - # Case 2: include memory dbs, min_schema_version = None -> accounting/hr/analytics appear, - # but root deny wins except where specifically allowed (none except analytics parent allow doesn’t apply to table depth if candidate includes children; still fine—policy is explicit). - rows2 = await resolve_permissions_from_catalog( - db, - {"id": "dev"}, - plugins, - VIEW_TABLE, - candidate_sql, - candidate_params={"exclude_memory": 0, "min_schema_version": None}, - implicit_deny=True, - ) - assert any(r["parent"] == "accounting" for r in rows2) - assert any(r["parent"] == "hr" for r in rows2) - # For table-scoped candidates, the parent-level allow does not override root deny unless you have child-level rules - assert all(r["allow"] in (0, 1) for r in rows2) + try: + actor = {"id": "alice"} + result = await ds.allowed_resources(VIEW_TABLE, actor) + allowed = _allowed_set(result.resources) + + # accounting/sales should be denied (child deny beats global allow) + assert ("perm_accounting", "sales") not in allowed + + # All hr tables should be denied (parent deny beats global allow) + hr_tables = {(p, c) for p, c in allowed if p == "perm_hr"} + assert len(hr_tables) == 0 + + # analytics tables should all be allowed + analytics_tables = {(p, c) for p, c in allowed if p == "perm_analytics"} + assert len(analytics_tables) == 10 + + # accounting tables (except sales) should be allowed + accounting_tables = {(p, c) for p, c in allowed if p == "perm_accounting"} + assert len(accounting_tables) == 9 # 10 - 1 denied + + # Verify with allowed() single-resource checks + assert not await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_accounting", "sales"), + actor=actor, + ) + assert not await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_hr", "table01"), + actor=actor, + ) + assert await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_analytics", "table01"), + actor=actor, + ) + finally: + ds.pm.unregister(plugin, name="test_plugin") @pytest.mark.asyncio -async def test_action_specific_rules(db): - await seed_catalog(db) - plugins = [plugin_allow_all_for_action("dana", VIEW_TABLE)] - - view_rows = await resolve_permissions_from_catalog( - db, - {"id": "dana"}, - plugins, - VIEW_TABLE, - TABLE_CANDIDATES_SQL, - implicit_deny=True, +async def test_parent_allow_but_child_conflict_deny_wins(ds): + """ + Carol has parent-level allow on analytics, but there are conflicting + child-level allow + deny on analytics/secret. DENY should win. + hr is parent-level denied. + """ + cb_allow, cb_deny = _cb_conflicting_same_child_rules( + "carol", "perm_analytics", "secret" ) - assert view_rows and all(r["allow"] == 1 for r in view_rows) - assert all(r["action"] == VIEW_TABLE for r in view_rows) + allow_parent_cb = _cb_allow_parent_for_user("carol", "perm_analytics") + deny_parent_cb = _cb_org_policy_deny_parent("perm_hr") - insert_rows = await resolve_permissions_from_catalog( - db, - {"id": "dana"}, - plugins, - "insert-row", - TABLE_CANDIDATES_SQL, - implicit_deny=True, - ) - assert insert_rows and all(r["allow"] == 0 for r in insert_rows) - assert all(r["reason"] == "implicit deny" for r in insert_rows) - assert all(r["action"] == "insert-row" for r in insert_rows) + def combined(datasette, actor, action): + results = [] + for cb in (deny_parent_cb, allow_parent_cb, cb_allow, cb_deny): + r = cb(datasette, actor, action) + if r is not None: + results.append(r) + return results + + plugin = PermissionRulesPlugin(combined) + ds.pm.register(plugin, name="test_plugin") + + try: + actor = {"id": "carol"} + result = await ds.allowed_resources(VIEW_TABLE, actor) + analytics_allowed = _allowed_set_for_parent(result.resources, "perm_analytics") + + # analytics/secret should be denied (child deny beats child allow) + assert ("perm_analytics", "secret") not in analytics_allowed + # 10 analytics tables, 1 denied + assert len(analytics_allowed) == 9 + + # Verify via allowed() + assert not await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_analytics", "secret"), + actor=actor, + ) + assert await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_analytics", "table02"), + actor=actor, + ) + finally: + ds.pm.unregister(plugin, name="test_plugin") @pytest.mark.asyncio -async def test_actor_actor_id_action_parameters_available(db): - """Test that :actor (JSON), :actor_id, and :action are all available in SQL""" - await seed_catalog(db) +async def test_specificity_child_allow_overrides_parent_deny(ds): + """ + analytics is parent-level denied, but alice has a child-level allow + on analytics/table02. Child beats parent. + """ + allow_cb = _cb_allow_all_for_user("alice") + deny_parent_cb = _cb_org_policy_deny_parent("perm_analytics") + child_allow_cb = _cb_child_allow_for_user("alice", "perm_analytics", "table02") - def plugin_using_all_parameters() -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: - return PermissionSQL( - """ + def combined(datasette, actor, action): + results = [] + for cb in (allow_cb, deny_parent_cb, child_allow_cb): + r = cb(datasette, actor, action) + if r is not None: + results.append(r) + return results + + plugin = PermissionRulesPlugin(combined) + ds.pm.register(plugin, name="test_plugin") + + try: + actor = {"id": "alice"} + result = await ds.allowed_resources(VIEW_TABLE, actor) + analytics_allowed = _allowed_set_for_parent(result.resources, "perm_analytics") + + # table02 should be allowed (child allow beats parent deny) + assert ("perm_analytics", "table02") in analytics_allowed + # All other analytics tables should be denied (parent deny, no child rule) + assert len(analytics_allowed) == 1 + + # Verify via allowed() + assert await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_analytics", "table02"), + actor=actor, + ) + assert not await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_analytics", "table01"), + actor=actor, + ) + finally: + ds.pm.unregister(plugin, name="test_plugin") + + +@pytest.mark.asyncio +async def test_root_deny_all_but_parent_allow_rescues_specific_parent(ds): + """ + Global deny for all, but bob has parent-level allow on accounting. + Parent beats global. + """ + deny_cb = _cb_root_deny_for_all() + allow_cb = _cb_allow_parent_for_user("bob", "perm_accounting") + + def combined(datasette, actor, action): + results = [] + for cb in (deny_cb, allow_cb): + r = cb(datasette, actor, action) + if r is not None: + results.append(r) + return results + + plugin = PermissionRulesPlugin(combined) + ds.pm.register(plugin, name="test_plugin") + + try: + actor = {"id": "bob"} + result = await ds.allowed_resources(VIEW_TABLE, actor) + allowed = _allowed_set(result.resources) + + # Only accounting tables should be allowed + assert all(p == "perm_accounting" for p, c in allowed) + assert len(allowed) == 10 + + # Verify via allowed() + assert await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_accounting", "table01"), + actor=actor, + ) + assert not await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_hr", "table01"), + actor=actor, + ) + finally: + ds.pm.unregister(plugin, name="test_plugin") + + +@pytest.mark.asyncio +async def test_parent_scoped_action(ds): + """ + For parent-scoped resources (databases), verify cascading. + analytics allowed, hr denied, accounting implicitly denied. + """ + deny_cb = _cb_org_policy_deny_parent("perm_hr") + allow_cb = _cb_allow_parent_for_user("carol", "perm_analytics") + + def combined(datasette, actor, action): + results = [] + for cb in (deny_cb, allow_cb): + r = cb(datasette, actor, action) + if r is not None: + results.append(r) + return results + + plugin = PermissionRulesPlugin(combined) + ds.pm.register(plugin, name="test_plugin") + + try: + actor = {"id": "carol"} + result = await ds.allowed_resources("view-database", actor) + allowed = {r.parent for r in result.resources} + + assert "perm_analytics" in allowed + # hr is explicitly denied, accounting has no matching rule → implicit deny + assert "perm_hr" not in allowed + + # Verify via allowed() + assert await ds.allowed( + action="view-database", + resource=DatabaseResource("perm_analytics"), + actor=actor, + ) + assert not await ds.allowed( + action="view-database", + resource=DatabaseResource("perm_hr"), + actor=actor, + ) + finally: + ds.pm.unregister(plugin, name="test_plugin") + + +@pytest.mark.asyncio +async def test_implicit_deny_when_no_rules(ds): + """ + When no plugins return any rules, everything is denied (implicit deny). + """ + def no_rules(datasette, actor, action): + return None + + plugin = PermissionRulesPlugin(no_rules) + ds.pm.register(plugin, name="test_plugin") + + try: + actor = {"id": "erin"} + result = await ds.allowed_resources(VIEW_TABLE, actor) + assert len(result.resources) == 0 + + # Single resource check too + assert not await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_accounting", "table01"), + actor=actor, + ) + finally: + ds.pm.unregister(plugin, name="test_plugin") + + +@pytest.mark.asyncio +async def test_action_specific_rules(ds): + """ + Rules that only apply to view-table should not grant insert-row. + """ + cb = _cb_allow_all_for_action("dana", VIEW_TABLE) + plugin = PermissionRulesPlugin(cb) + ds.pm.register(plugin, name="test_plugin") + + try: + actor = {"id": "dana"} + + # view-table should be allowed + result = await ds.allowed_resources(VIEW_TABLE, actor) + assert len(result.resources) == 30 # 3 dbs x 10 tables + + assert await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_accounting", "table01"), + actor=actor, + ) + + # insert-row should be denied (no rules for it) + assert not await ds.allowed( + action="insert-row", + resource=TableResource("perm_accounting", "table01"), + actor=actor, + ) + finally: + ds.pm.unregister(plugin, name="test_plugin") + + +@pytest.mark.asyncio +async def test_actor_parameters_available_in_sql(ds): + """ + Test that :actor (JSON), :actor_id, and :action are all available in plugin SQL. + """ + def cb(datasette, actor, action): + return PermissionSQL( + sql=""" SELECT NULL AS parent, NULL AS child, 1 AS allow, 'Actor ID: ' || COALESCE(:actor_id, 'null') || - ', Actor JSON: ' || COALESCE(:actor, 'null') || ', Action: ' || :action AS reason WHERE :actor_id = 'test_user' AND :action = 'view-table' AND json_extract(:actor, '$.role') = 'admin' - """ - ) + """, + params={}, # :actor_id, :actor, :action are added by the framework + ) - return provider + plugin = PermissionRulesPlugin(cb) + ds.pm.register(plugin, name="test_plugin") - plugins = [plugin_using_all_parameters()] + try: + actor = {"id": "test_user", "role": "admin"} - # Test with full actor dict - rows = await resolve_permissions_from_catalog( - db, - {"id": "test_user", "role": "admin"}, - plugins, - "view-table", - TABLE_CANDIDATES_SQL, - implicit_deny=True, - ) + # Should be allowed because the SQL conditions are met + assert await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_accounting", "table01"), + actor=actor, + ) - # Should have allowed rows with reason containing all the info - allowed = [r for r in rows if r["allow"] == 1] - assert len(allowed) > 0 + # Different actor should be denied + assert not await ds.allowed( + action=VIEW_TABLE, + resource=TableResource("perm_accounting", "table01"), + actor={"id": "other_user", "role": "admin"}, + ) - # Check that the reason string contains evidence of all parameters - reason = allowed[0]["reason"] - assert "test_user" in reason - assert "view-table" in reason - # The :actor parameter should be the JSON string - assert "Actor JSON:" in reason + # Verify allowed_resources also works + result = await ds.allowed_resources(VIEW_TABLE, actor) + assert len(result.resources) == 30 # all tables allowed + finally: + ds.pm.unregister(plugin, name="test_plugin") @pytest.mark.asyncio -async def test_multiple_plugins_with_own_parameters(db): +async def test_multiple_plugins_with_own_parameters(ds): """ - Test that multiple plugins can use their own parameter names without conflict. - - This verifies that the parameter naming convention works: plugins prefix their - parameters (e.g., :plugin1_pattern, :plugin2_message) and both sets of parameters - are successfully bound in the SQL queries. + Multiple plugins can use their own parameter names without conflict. """ - await seed_catalog(db) - - def plugin_one() -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: - if action != "view-table": - return PermissionSQL("plugin_one", "SELECT NULL WHERE 0", {}) - return PermissionSQL( - """ + def cb_one(datasette, actor, action): + if action != VIEW_TABLE: + return None + return PermissionSQL( + sql=""" SELECT database_name AS parent, table_name AS child, - 1 AS allow, 'Plugin one used param: ' || :plugin1_param AS reason + 1 AS allow, 'Plugin one: ' || :p1_param AS reason FROM catalog_tables - WHERE database_name = 'accounting' - """, - { - "plugin1_param": "value1", - }, - ) + WHERE database_name = 'perm_accounting' + """, + params={"p1_param": "value1"}, + ) - return provider - - def plugin_two() -> Callable[[str], PermissionSQL]: - def provider(action: str) -> PermissionSQL: - if action != "view-table": - return PermissionSQL("plugin_two", "SELECT NULL WHERE 0", {}) - return PermissionSQL( - """ + def cb_two(datasette, actor, action): + if action != VIEW_TABLE: + return None + return PermissionSQL( + sql=""" SELECT database_name AS parent, table_name AS child, - 1 AS allow, 'Plugin two used param: ' || :plugin2_param AS reason + 1 AS allow, 'Plugin two: ' || :p2_param AS reason FROM catalog_tables - WHERE database_name = 'hr' - """, - { - "plugin2_param": "value2", - }, - ) + WHERE database_name = 'perm_hr' + """, + params={"p2_param": "value2"}, + ) - return provider + plugin_one = PermissionRulesPlugin(cb_one) + plugin_two = PermissionRulesPlugin(cb_two) + ds.pm.register(plugin_one, name="test_plugin_one") + ds.pm.register(plugin_two, name="test_plugin_two") - plugins = [plugin_one(), plugin_two()] + try: + actor = {"id": "test_user"} + result = await ds.allowed_resources(VIEW_TABLE, actor, include_reasons=True) + allowed = _allowed_set(result.resources) - rows = await resolve_permissions_from_catalog( - db, - {"id": "test_user"}, - plugins, - "view-table", - TABLE_CANDIDATES_SQL, - implicit_deny=False, - ) + # Both plugins should contribute — accounting from plugin one, hr from plugin two + accounting_allowed = {(p, c) for p, c in allowed if p == "perm_accounting"} + hr_allowed = {(p, c) for p, c in allowed if p == "perm_hr"} - # Both plugins should contribute results with their parameters successfully bound - plugin_one_rows = [ - r for r in rows if r.get("reason") and "Plugin one" in r["reason"] - ] - plugin_two_rows = [ - r for r in rows if r.get("reason") and "Plugin two" in r["reason"] - ] + assert len(accounting_allowed) == 10 + assert len(hr_allowed) == 10 - assert len(plugin_one_rows) > 0, "Plugin one should contribute rules" - assert len(plugin_two_rows) > 0, "Plugin two should contribute rules" - - # Verify each plugin's parameters were successfully bound in the SQL - assert any( - "value1" in r.get("reason", "") for r in plugin_one_rows - ), "Plugin one's :plugin1_param should be bound" - assert any( - "value2" in r.get("reason", "") for r in plugin_two_rows - ), "Plugin two's :plugin2_param should be bound" + # Check reasons contain the parameterized values + for r in result.resources: + if r.parent == "perm_accounting": + assert any("value1" in reason for reason in r.reasons) + elif r.parent == "perm_hr": + assert any("value2" in reason for reason in r.reasons) + finally: + ds.pm.unregister(plugin_one, name="test_plugin_one") + ds.pm.unregister(plugin_two, name="test_plugin_two")