mirror of
https://github.com/simonw/datasette.git
synced 2025-12-10 16:51:24 +01:00
Address PR #2515 review comments
- Add URL to sqlite-permissions-poc in module docstring - Replace Optional with | None for modern Python syntax - Add Datasette type annotations - Add SQL comment explaining cascading permission logic - Refactor duplicated plugin result processing into helper function
This commit is contained in:
parent
a21a1b6c14
commit
79879b834a
1 changed files with 55 additions and 26 deletions
|
|
@ -2,7 +2,9 @@
|
|||
SQL query builder for hierarchical permission checking.
|
||||
|
||||
This module implements a cascading permission system based on the pattern
|
||||
from the sqlite-permissions-poc. It builds SQL queries that:
|
||||
from https://github.com/simonw/research/tree/main/sqlite-permissions-poc
|
||||
|
||||
It builds SQL queries that:
|
||||
|
||||
1. Start with all resources of a given type (from resource_type.resources_sql())
|
||||
2. Gather permission rules from plugins (via permission_resources_sql hook)
|
||||
|
|
@ -19,14 +21,46 @@ The core pattern is:
|
|||
- Across levels, child beats parent beats global
|
||||
"""
|
||||
|
||||
from typing import Optional
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from datasette.plugins import pm
|
||||
from datasette.utils import await_me_maybe
|
||||
from datasette.permissions import PermissionSQL
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from datasette.app import Datasette
|
||||
|
||||
|
||||
def _process_permission_results(results) -> tuple[list[str], dict]:
|
||||
"""
|
||||
Process plugin permission results into SQL fragments and parameters.
|
||||
|
||||
Args:
|
||||
results: Results from permission_resources_sql hook (may be list or single PermissionSQL)
|
||||
|
||||
Returns:
|
||||
A tuple of (list of SQL strings, dict of parameters)
|
||||
"""
|
||||
rule_sqls = []
|
||||
all_params = {}
|
||||
|
||||
if results is None:
|
||||
return rule_sqls, all_params
|
||||
|
||||
if isinstance(results, list):
|
||||
for plugin_sql in results:
|
||||
if isinstance(plugin_sql, PermissionSQL):
|
||||
rule_sqls.append(plugin_sql.sql)
|
||||
all_params.update(plugin_sql.params)
|
||||
elif isinstance(results, PermissionSQL):
|
||||
rule_sqls.append(results.sql)
|
||||
all_params.update(results.params)
|
||||
|
||||
return rule_sqls, all_params
|
||||
|
||||
|
||||
async def build_allowed_resources_sql(
|
||||
datasette,
|
||||
datasette: "Datasette",
|
||||
actor: dict | None,
|
||||
action: str,
|
||||
) -> tuple[str, dict]:
|
||||
|
|
@ -76,16 +110,9 @@ async def build_allowed_resources_sql(
|
|||
|
||||
for result in rule_results:
|
||||
result = await await_me_maybe(result)
|
||||
if result is None:
|
||||
continue
|
||||
if isinstance(result, list):
|
||||
for plugin_sql in result:
|
||||
if isinstance(plugin_sql, PermissionSQL):
|
||||
rule_sqls.append(plugin_sql.sql)
|
||||
all_params.update(plugin_sql.params)
|
||||
elif isinstance(result, PermissionSQL):
|
||||
rule_sqls.append(result.sql)
|
||||
all_params.update(result.params)
|
||||
sqls, params = _process_permission_results(result)
|
||||
rule_sqls.extend(sqls)
|
||||
all_params.update(params)
|
||||
|
||||
# If no rules, return empty result (deny all)
|
||||
if not rule_sqls:
|
||||
|
|
@ -135,6 +162,15 @@ global_lvl AS (
|
|||
decisions AS (
|
||||
SELECT
|
||||
b.parent, b.child,
|
||||
-- Cascading permission logic: child → parent → global, DENY beats ALLOW at each level
|
||||
-- Priority order:
|
||||
-- 1. Child-level deny (most specific, blocks access)
|
||||
-- 2. Child-level allow (most specific, grants access)
|
||||
-- 3. Parent-level deny (intermediate, blocks access)
|
||||
-- 4. Parent-level allow (intermediate, grants access)
|
||||
-- 5. Global-level deny (least specific, blocks access)
|
||||
-- 6. Global-level allow (least specific, grants access)
|
||||
-- 7. Default deny (no rules match)
|
||||
CASE
|
||||
WHEN cl.any_deny = 1 THEN 0
|
||||
WHEN cl.any_allow = 1 THEN 1
|
||||
|
|
@ -167,11 +203,11 @@ ORDER BY parent, child
|
|||
|
||||
|
||||
async def check_permission_for_resource(
|
||||
datasette,
|
||||
datasette: "Datasette",
|
||||
actor: dict | None,
|
||||
action: str,
|
||||
parent: Optional[str],
|
||||
child: Optional[str],
|
||||
parent: str | None,
|
||||
child: str | None,
|
||||
) -> bool:
|
||||
"""
|
||||
Check if an actor has permission for a specific action on a specific resource.
|
||||
|
|
@ -207,16 +243,9 @@ async def check_permission_for_resource(
|
|||
|
||||
for result in rule_results:
|
||||
result = await await_me_maybe(result)
|
||||
if result is None:
|
||||
continue
|
||||
if isinstance(result, list):
|
||||
for plugin_sql in result:
|
||||
if isinstance(plugin_sql, PermissionSQL):
|
||||
rule_sqls.append(plugin_sql.sql)
|
||||
all_params.update(plugin_sql.params)
|
||||
elif isinstance(result, PermissionSQL):
|
||||
rule_sqls.append(result.sql)
|
||||
all_params.update(result.params)
|
||||
sqls, params = _process_permission_results(result)
|
||||
rule_sqls.extend(sqls)
|
||||
all_params.update(params)
|
||||
|
||||
# If no rules, default deny
|
||||
if not rule_sqls:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue