Refactor AllowedResourcesView to use datasette.allowed_resources()

Refs https://github.com/simonw/datasette/issues/2527#issuecomment-3444586698
This commit is contained in:
Simon Willison 2025-10-24 12:21:48 -07:00
commit 4d03e8c12e

View file

@ -5,13 +5,9 @@ from datasette.utils.asgi import Response, Forbidden
from datasette.utils import (
actor_matches_allow,
add_cors_headers,
await_me_maybe,
tilde_encode,
tilde_decode,
)
from datasette.permissions import PermissionSQL
from datasette.utils.permissions import resolve_permissions_from_catalog
from datasette.plugins import pm
from .base import BaseView, View
import secrets
import urllib
@ -263,172 +259,68 @@ class AllowedResourcesView(BaseView):
page_size = max_page_size
offset = (page - 1) * page_size
candidate_sql, candidate_params = self.CANDIDATE_SQL[action]
db = self.ds.get_internal_database()
required_tables = set()
if "catalog_tables" in candidate_sql:
required_tables.add("catalog_tables")
if "catalog_databases" in candidate_sql:
required_tables.add("catalog_databases")
for table in required_tables:
if not await db.table_exists(table):
headers = {}
if self.ds.cors:
add_cors_headers(headers)
return Response.json(
{
"action": action,
"actor_id": (actor or {}).get("id") if actor else None,
"page": page,
"page_size": page_size,
"total": 0,
"items": [],
},
headers=headers,
)
# Check if this action requires another action
action_obj = self.ds.actions.get(action)
if action_obj and action_obj.also_requires:
# Need to combine results from both actions
# Get allowed resources for the main action
plugins = []
for block in pm.hook.permission_resources_sql(
datasette=self.ds,
actor=actor,
# Use the simplified allowed_resources method
try:
allowed_resources = await self.ds.allowed_resources(
action=action,
):
block = await await_me_maybe(block)
if block is None:
continue
if isinstance(block, (list, tuple)):
candidates = block
else:
candidates = [block]
for candidate in candidates:
if candidate is None:
continue
plugins.append(candidate)
main_rows = await resolve_permissions_from_catalog(
db,
actor=actor,
plugins=plugins,
action=action,
candidate_sql=candidate_sql,
candidate_params=candidate_params,
implicit_deny=True,
parent=parent_filter,
)
main_allowed = {
(row["parent"], row["child"]) for row in main_rows if row["allow"] == 1
except Exception:
# If catalog tables don't exist yet, return empty results
headers = {}
if self.ds.cors:
add_cors_headers(headers)
return Response.json(
{
"action": action,
"actor_id": actor_id,
"page": page,
"page_size": page_size,
"total": 0,
"items": [],
},
headers=headers,
)
# Convert to list of dicts with resource path
allowed_rows = []
for resource in allowed_resources:
parent_val = resource.parent
child_val = resource.child
# Build resource path
if parent_val is None:
resource_path = "/"
elif child_val is None:
resource_path = f"/{parent_val}"
else:
resource_path = f"/{parent_val}/{child_val}"
row = {
"parent": parent_val,
"child": child_val,
"resource": resource_path,
}
# Get allowed resources for the required action
required_action = action_obj.also_requires
required_candidate_sql, required_candidate_params = self.CANDIDATE_SQL.get(
required_action, (None, None)
)
if not required_candidate_sql:
# If the required action doesn't have candidate SQL, deny everything
allowed_rows = []
else:
required_plugins = []
for block in pm.hook.permission_resources_sql(
datasette=self.ds,
actor=actor,
action=required_action,
):
block = await await_me_maybe(block)
if block is None:
continue
if isinstance(block, (list, tuple)):
candidates = block
else:
candidates = [block]
for candidate in candidates:
if candidate is None:
continue
required_plugins.append(candidate)
# Add debug fields if available
if has_debug_permission and hasattr(resource, "_reason"):
row["reason"] = resource._reason
if has_debug_permission and hasattr(resource, "_source_plugin"):
row["source_plugin"] = resource._source_plugin
required_rows = await resolve_permissions_from_catalog(
db,
actor=actor,
plugins=required_plugins,
action=required_action,
candidate_sql=required_candidate_sql,
candidate_params=required_candidate_params,
implicit_deny=True,
)
required_allowed = {
(row["parent"], row["child"])
for row in required_rows
if row["allow"] == 1
}
allowed_rows.append(row)
# Intersect the two sets - only resources allowed by BOTH actions
allowed_resources = main_allowed & required_allowed
# Get full row data for the allowed resources
allowed_rows = [
row
for row in main_rows
if row["allow"] == 1
and (row["parent"], row["child"]) in allowed_resources
]
else:
# No also_requires, use normal path
plugins = []
for block in pm.hook.permission_resources_sql(
datasette=self.ds,
actor=actor,
action=action,
):
block = await await_me_maybe(block)
if block is None:
continue
if isinstance(block, (list, tuple)):
candidates = block
else:
candidates = [block]
for candidate in candidates:
if candidate is None:
continue
plugins.append(candidate)
rows = await resolve_permissions_from_catalog(
db,
actor=actor,
plugins=plugins,
action=action,
candidate_sql=candidate_sql,
candidate_params=candidate_params,
implicit_deny=True,
)
allowed_rows = [row for row in rows if row["allow"] == 1]
if parent_filter is not None:
allowed_rows = [
row for row in allowed_rows if row["parent"] == parent_filter
]
# Apply child filter if specified
if child_filter is not None:
allowed_rows = [row for row in allowed_rows if row["child"] == child_filter]
# Pagination
total = len(allowed_rows)
paged_rows = allowed_rows[offset : offset + page_size]
items = []
for row in paged_rows:
item = {
"parent": row["parent"],
"child": row["child"],
"resource": row["resource"],
}
# Only include sensitive fields if user has permissions-debug
if has_debug_permission:
item["reason"] = row["reason"]
item["source_plugin"] = row["source_plugin"]
items.append(item)
# Items are already in the right format
items = paged_rows
def build_page_url(page_number):
pairs = []