Add parent filter and include_is_private to allowed_resources()

Major improvements to the allowed_resources() API:

1. **parent filter**: Filter results to specific database in SQL, not Python
   - Avoids loading thousands of tables into Python memory
   - Filtering happens efficiently in SQLite

2. **include_is_private flag**: Detect private resources in single SQL query
   - Compares actor permissions vs anonymous permissions in SQL
   - LEFT JOIN between actor_allowed and anon_allowed CTEs
   - Returns is_private column: 1 if anonymous blocked, 0 otherwise
   - No individual check_visibility() calls needed

3. **Resource.private property**: Safe access with clear error messages
   - Raises AttributeError if accessed without include_is_private=True
   - Prevents accidental misuse of the property

4. **Database view optimization**: Use new API to eliminate redundant checks
   - Single bulk query replaces N individual permission checks
   - Private flag computed in SQL, not via check_visibility() calls
   - Views filtered from allowed_dict instead of checking db.view_names()

All permission filtering now happens in SQLite where it belongs, with
minimal data transferred to Python.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Simon Willison 2025-10-24 09:30:37 -07:00
commit 3adddad6aa
4 changed files with 319 additions and 133 deletions

View file

@ -1305,15 +1305,28 @@ class Datasette:
*,
action: str,
actor: dict | None = None,
parent: str | None = None,
include_is_private: bool = False,
) -> tuple[str, dict]:
"""
Build SQL query to get all resources the actor can access for the given action.
Args:
action: The action name (e.g., "view-table")
actor: The actor dict (or None for unauthenticated)
parent: Optional parent filter (e.g., database name) to limit results
include_is_private: If True, include is_private column showing if anonymous cannot access
Returns a tuple of (query, params) that can be executed against the internal database.
The query returns rows with (parent, child, reason) columns.
The query returns rows with (parent, child, reason) columns, plus is_private if requested.
Example:
query, params = await datasette.allowed_resources_sql(action="view-table", actor=actor)
query, params = await datasette.allowed_resources_sql(
action="view-table",
actor=actor,
parent="mydb",
include_is_private=True
)
result = await datasette.get_internal_database().execute(query, params)
"""
from datasette.utils.actions_sql import build_allowed_resources_sql
@ -1322,12 +1335,17 @@ class Datasette:
if not action_obj:
raise ValueError(f"Unknown action: {action}")
return await build_allowed_resources_sql(self, actor, action)
return await build_allowed_resources_sql(
self, actor, action, parent=parent, include_is_private=include_is_private
)
async def allowed_resources(
self,
action: str,
actor: dict | None = None,
*,
parent: str | None = None,
include_is_private: bool = False,
) -> list["Resource"]:
"""
Return all resources the actor can access for the given action.
@ -1335,10 +1353,25 @@ class Datasette:
Uses SQL to filter resources based on cascading permission rules.
Returns instances of the appropriate Resource subclass.
Args:
action: The action name (e.g., "view-table")
actor: The actor dict (or None for unauthenticated)
parent: Optional parent filter (e.g., database name) to limit results
include_is_private: If True, adds a .private attribute to each Resource
Example:
# Get all tables
tables = await datasette.allowed_resources("view-table", actor)
for table in tables:
print(f"{table.parent}/{table.child}")
# Get tables for specific database with private flag
tables = await datasette.allowed_resources(
"view-table", actor, parent="mydb", include_is_private=True
)
for table in tables:
if table.private:
print(f"{table.child} is private")
"""
from datasette.permissions import Resource
@ -1346,17 +1379,24 @@ class Datasette:
if not action_obj:
raise ValueError(f"Unknown action: {action}")
query, params = await self.allowed_resources_sql(action=action, actor=actor)
query, params = await self.allowed_resources_sql(
action=action,
actor=actor,
parent=parent,
include_is_private=include_is_private,
)
result = await self.get_internal_database().execute(query, params)
# Instantiate the appropriate Resource subclass for each row
resource_class = action_obj.resource_class
resources = []
for row in result.rows:
# row[0]=parent, row[1]=child, row[2]=reason (ignored)
# row[0]=parent, row[1]=child, row[2]=reason (ignored), row[3]=is_private (if requested)
# Create instance directly with parent/child from base class
resource = object.__new__(resource_class)
Resource.__init__(resource, parent=row[0], child=row[1])
if include_is_private:
resource.private = bool(row[3])
resources.append(resource)
return resources

View file

@ -26,6 +26,29 @@ class Resource(ABC):
"""
self.parent = parent
self.child = child
self._private = None # Sentinel to track if private was set
@property
def private(self) -> bool:
"""
Whether this resource is private (accessible to actor but not anonymous).
This property is only available on Resource objects returned from
allowed_resources() when include_is_private=True is used.
Raises:
AttributeError: If accessed without calling include_is_private=True
"""
if self._private is None:
raise AttributeError(
"The 'private' attribute is only available when using "
"allowed_resources(..., include_is_private=True)"
)
return self._private
@private.setter
def private(self, value: bool):
self._private = value
@classmethod
@abstractmethod

View file

@ -63,6 +63,9 @@ async def build_allowed_resources_sql(
datasette: "Datasette",
actor: dict | None,
action: str,
*,
parent: str | None = None,
include_is_private: bool = False,
) -> tuple[str, dict]:
"""
Build a SQL query that returns all resources the actor can access for this action.
@ -71,14 +74,17 @@ async def build_allowed_resources_sql(
datasette: The Datasette instance
actor: The actor dict (or None for unauthenticated)
action: The action name (e.g., "view-table", "view-database")
parent: Optional parent filter to limit results (e.g., database name)
include_is_private: If True, add is_private column showing if anonymous cannot access
Returns:
A tuple of (sql_query, params_dict)
The returned SQL query will have three columns:
The returned SQL query will have three columns (or four with include_is_private):
- parent: The parent resource identifier (or NULL)
- child: The child resource identifier (or NULL)
- reason: The reason from the rule that granted access
- is_private: (if include_is_private) 1 if anonymous cannot access, 0 otherwise
Example:
For action="view-table", this might return:
@ -116,90 +122,219 @@ async def build_allowed_resources_sql(
# If no rules, return empty result (deny all)
if not rule_sqls:
return "SELECT NULL AS parent, NULL AS child, NULL AS reason WHERE 0", {}
empty_cols = "NULL AS parent, NULL AS child, NULL AS reason"
if include_is_private:
empty_cols += ", NULL AS is_private"
return f"SELECT {empty_cols} WHERE 0", {}
# Build the cascading permission query
rules_union = " UNION ALL ".join(rule_sqls)
query = f"""
WITH
base AS (
{base_resources_sql}
),
all_rules AS (
{rules_union}
),
child_lvl AS (
SELECT b.parent, b.child,
MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny,
MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow,
MAX(CASE WHEN ar.allow = 0 THEN ar.reason ELSE NULL END) AS deny_reason,
MAX(CASE WHEN ar.allow = 1 THEN ar.reason ELSE NULL END) AS allow_reason
FROM base b
LEFT JOIN all_rules ar ON ar.parent = b.parent AND ar.child = b.child
GROUP BY b.parent, b.child
),
parent_lvl AS (
SELECT b.parent, b.child,
MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny,
MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow,
MAX(CASE WHEN ar.allow = 0 THEN ar.reason ELSE NULL END) AS deny_reason,
MAX(CASE WHEN ar.allow = 1 THEN ar.reason ELSE NULL END) AS allow_reason
FROM base b
LEFT JOIN all_rules ar ON ar.parent = b.parent AND ar.child IS NULL
GROUP BY b.parent, b.child
),
global_lvl AS (
SELECT b.parent, b.child,
MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny,
MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow,
MAX(CASE WHEN ar.allow = 0 THEN ar.reason ELSE NULL END) AS deny_reason,
MAX(CASE WHEN ar.allow = 1 THEN ar.reason ELSE NULL END) AS allow_reason
FROM base b
LEFT JOIN all_rules ar ON ar.parent IS NULL AND ar.child IS NULL
GROUP BY b.parent, b.child
),
decisions AS (
SELECT
b.parent, b.child,
-- Cascading permission logic: child parent global, DENY beats ALLOW at each level
-- Priority order:
-- 1. Child-level deny (most specific, blocks access)
-- 2. Child-level allow (most specific, grants access)
-- 3. Parent-level deny (intermediate, blocks access)
-- 4. Parent-level allow (intermediate, grants access)
-- 5. Global-level deny (least specific, blocks access)
-- 6. Global-level allow (least specific, grants access)
-- 7. Default deny (no rules match)
CASE
WHEN cl.any_deny = 1 THEN 0
WHEN cl.any_allow = 1 THEN 1
WHEN pl.any_deny = 1 THEN 0
WHEN pl.any_allow = 1 THEN 1
WHEN gl.any_deny = 1 THEN 0
WHEN gl.any_allow = 1 THEN 1
ELSE 0
END AS is_allowed,
CASE
WHEN cl.any_deny = 1 THEN cl.deny_reason
WHEN cl.any_allow = 1 THEN cl.allow_reason
WHEN pl.any_deny = 1 THEN pl.deny_reason
WHEN pl.any_allow = 1 THEN pl.allow_reason
WHEN gl.any_deny = 1 THEN gl.deny_reason
WHEN gl.any_allow = 1 THEN gl.allow_reason
ELSE 'default deny'
END AS reason
FROM base b
JOIN child_lvl cl USING (parent, child)
JOIN parent_lvl pl USING (parent, child)
JOIN global_lvl gl USING (parent, child)
)
SELECT parent, child, reason
FROM decisions
WHERE is_allowed = 1
ORDER BY parent, child
"""
return query.strip(), all_params
# Build the main query
query_parts = [
"WITH",
"base AS (",
f" {base_resources_sql}",
"),",
"all_rules AS (",
f" {rules_union}",
"),",
]
# If include_is_private, we need to build anonymous permissions too
if include_is_private:
# Get anonymous permission rules
anon_rule_results = pm.hook.permission_resources_sql(
datasette=datasette,
actor=None,
action=action,
)
anon_rule_sqls = []
anon_params = {}
for result in anon_rule_results:
result = await await_me_maybe(result)
sqls, params = _process_permission_results(result)
anon_rule_sqls.extend(sqls)
# Namespace anonymous params to avoid conflicts
for key, value in params.items():
anon_params[f"anon_{key}"] = value
# Rewrite anonymous SQL to use namespaced params
anon_sqls_rewritten = []
for sql in anon_rule_sqls:
for key in params.keys():
sql = sql.replace(f":{key}", f":anon_{key}")
anon_sqls_rewritten.append(sql)
all_params.update(anon_params)
if anon_sqls_rewritten:
anon_rules_union = " UNION ALL ".join(anon_sqls_rewritten)
query_parts.extend(
[
"anon_rules AS (",
f" {anon_rules_union}",
"),",
]
)
# Continue with the cascading logic
query_parts.extend(
[
"child_lvl AS (",
" SELECT b.parent, b.child,",
" MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny,",
" MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow,",
" MAX(CASE WHEN ar.allow = 0 THEN ar.reason ELSE NULL END) AS deny_reason,",
" MAX(CASE WHEN ar.allow = 1 THEN ar.reason ELSE NULL END) AS allow_reason",
" FROM base b",
" LEFT JOIN all_rules ar ON ar.parent = b.parent AND ar.child = b.child",
" GROUP BY b.parent, b.child",
"),",
"parent_lvl AS (",
" SELECT b.parent, b.child,",
" MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny,",
" MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow,",
" MAX(CASE WHEN ar.allow = 0 THEN ar.reason ELSE NULL END) AS deny_reason,",
" MAX(CASE WHEN ar.allow = 1 THEN ar.reason ELSE NULL END) AS allow_reason",
" FROM base b",
" LEFT JOIN all_rules ar ON ar.parent = b.parent AND ar.child IS NULL",
" GROUP BY b.parent, b.child",
"),",
"global_lvl AS (",
" SELECT b.parent, b.child,",
" MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny,",
" MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow,",
" MAX(CASE WHEN ar.allow = 0 THEN ar.reason ELSE NULL END) AS deny_reason,",
" MAX(CASE WHEN ar.allow = 1 THEN ar.reason ELSE NULL END) AS allow_reason",
" FROM base b",
" LEFT JOIN all_rules ar ON ar.parent IS NULL AND ar.child IS NULL",
" GROUP BY b.parent, b.child",
"),",
]
)
# Add anonymous decision logic if needed
if include_is_private:
query_parts.extend(
[
"anon_child_lvl AS (",
" SELECT b.parent, b.child,",
" MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny,",
" MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow",
" FROM base b",
" LEFT JOIN anon_rules ar ON ar.parent = b.parent AND ar.child = b.child",
" GROUP BY b.parent, b.child",
"),",
"anon_parent_lvl AS (",
" SELECT b.parent, b.child,",
" MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny,",
" MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow",
" FROM base b",
" LEFT JOIN anon_rules ar ON ar.parent = b.parent AND ar.child IS NULL",
" GROUP BY b.parent, b.child",
"),",
"anon_global_lvl AS (",
" SELECT b.parent, b.child,",
" MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny,",
" MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow",
" FROM base b",
" LEFT JOIN anon_rules ar ON ar.parent IS NULL AND ar.child IS NULL",
" GROUP BY b.parent, b.child",
"),",
"anon_decisions AS (",
" SELECT",
" b.parent, b.child,",
" CASE",
" WHEN acl.any_deny = 1 THEN 0",
" WHEN acl.any_allow = 1 THEN 1",
" WHEN apl.any_deny = 1 THEN 0",
" WHEN apl.any_allow = 1 THEN 1",
" WHEN agl.any_deny = 1 THEN 0",
" WHEN agl.any_allow = 1 THEN 1",
" ELSE 0",
" END AS anon_is_allowed",
" FROM base b",
" JOIN anon_child_lvl acl USING (parent, child)",
" JOIN anon_parent_lvl apl USING (parent, child)",
" JOIN anon_global_lvl agl USING (parent, child)",
"),",
]
)
# Final decisions
query_parts.extend(
[
"decisions AS (",
" SELECT",
" b.parent, b.child,",
" -- Cascading permission logic: child → parent → global, DENY beats ALLOW at each level",
" -- Priority order:",
" -- 1. Child-level deny (most specific, blocks access)",
" -- 2. Child-level allow (most specific, grants access)",
" -- 3. Parent-level deny (intermediate, blocks access)",
" -- 4. Parent-level allow (intermediate, grants access)",
" -- 5. Global-level deny (least specific, blocks access)",
" -- 6. Global-level allow (least specific, grants access)",
" -- 7. Default deny (no rules match)",
" CASE",
" WHEN cl.any_deny = 1 THEN 0",
" WHEN cl.any_allow = 1 THEN 1",
" WHEN pl.any_deny = 1 THEN 0",
" WHEN pl.any_allow = 1 THEN 1",
" WHEN gl.any_deny = 1 THEN 0",
" WHEN gl.any_allow = 1 THEN 1",
" ELSE 0",
" END AS is_allowed,",
" CASE",
" WHEN cl.any_deny = 1 THEN cl.deny_reason",
" WHEN cl.any_allow = 1 THEN cl.allow_reason",
" WHEN pl.any_deny = 1 THEN pl.deny_reason",
" WHEN pl.any_allow = 1 THEN pl.allow_reason",
" WHEN gl.any_deny = 1 THEN gl.deny_reason",
" WHEN gl.any_allow = 1 THEN gl.allow_reason",
" ELSE 'default deny'",
" END AS reason",
]
)
if include_is_private:
query_parts.append(
" , CASE WHEN ad.anon_is_allowed = 0 THEN 1 ELSE 0 END AS is_private"
)
query_parts.extend(
[
" FROM base b",
" JOIN child_lvl cl USING (parent, child)",
" JOIN parent_lvl pl USING (parent, child)",
" JOIN global_lvl gl USING (parent, child)",
]
)
if include_is_private:
query_parts.append(" JOIN anon_decisions ad USING (parent, child)")
query_parts.append(")")
# Final SELECT
select_cols = "parent, child, reason"
if include_is_private:
select_cols += ", is_private"
query_parts.append(f"SELECT {select_cols}")
query_parts.append("FROM decisions")
query_parts.append("WHERE is_allowed = 1")
# Add parent filter if specified
if parent is not None:
query_parts.append(" AND parent = :filter_parent")
all_params["filter_parent"] = parent
query_parts.append("ORDER BY parent, child")
query = "\n".join(query_parts)
return query, all_params
async def check_permission_for_resource(

View file

@ -71,34 +71,24 @@ class DatabaseView(View):
metadata = await datasette.get_database_metadata(database)
# Get all tables/views this actor can see in bulk
# Get all tables/views this actor can see in bulk with private flag
from datasette.resources import TableResource
allowed_tables = await datasette.allowed_resources("view-table", request.actor)
allowed_table_set = {
(r.parent, r.child) for r in allowed_tables if r.parent == database
}
allowed_tables = await datasette.allowed_resources(
"view-table", request.actor, parent=database, include_is_private=True
)
# Create lookup dict for quick access
allowed_dict = {r.child: r for r in allowed_tables}
sql_views = []
for view_name in await db.view_names():
if (database, view_name) in allowed_table_set:
# Check if it's private (requires elevated permissions)
_, view_private = await datasette.check_visibility(
request.actor,
permissions=[
("view-table", (database, view_name)),
("view-database", database),
"view-instance",
],
)
sql_views.append(
{
"name": view_name,
"private": view_private,
}
)
# Filter to just views
view_names_set = set(await db.view_names())
sql_views = [
{"name": name, "private": allowed_dict[name].private}
for name in allowed_dict
if name in view_names_set
]
tables = await get_tables(datasette, request, db, allowed_table_set)
tables = await get_tables(datasette, request, db, allowed_dict)
canned_queries = []
for query in (
await datasette.get_canned_queries(database, request.actor)
@ -341,7 +331,16 @@ class QueryContext(Context):
)
async def get_tables(datasette, request, db, allowed_table_set):
async def get_tables(datasette, request, db, allowed_dict):
"""
Get list of tables with metadata for the database view.
Args:
datasette: The Datasette instance
request: The current request
db: The database
allowed_dict: Dict mapping table name -> Resource object with .private attribute
"""
tables = []
database = db.name
table_counts = await db.table_counts(100)
@ -349,19 +348,9 @@ async def get_tables(datasette, request, db, allowed_table_set):
all_foreign_keys = await db.get_all_foreign_keys()
for table in table_counts:
if (database, table) not in allowed_table_set:
if table not in allowed_dict:
continue
# Check if it's private (requires elevated permissions)
_, table_private = await datasette.check_visibility(
request.actor,
permissions=[
("view-table", (database, table)),
("view-database", database),
"view-instance",
],
)
table_columns = await db.table_columns(table)
tables.append(
{
@ -372,7 +361,7 @@ async def get_tables(datasette, request, db, allowed_table_set):
"hidden": table in hidden_table_names,
"fts_table": await db.fts_table(table),
"foreign_keys": all_foreign_keys[table],
"private": table_private,
"private": allowed_dict[table].private,
}
)
tables.sort(key=lambda t: (t["hidden"], t["name"]))
@ -521,13 +510,14 @@ class QueryView(View):
db = await datasette.resolve_database(request)
database = db.name
# Get all tables/views this actor can see in bulk
# Get all tables/views this actor can see in bulk with private flag
from datasette.resources import TableResource
allowed_tables = await datasette.allowed_resources("view-table", request.actor)
allowed_table_set = {
(r.parent, r.child) for r in allowed_tables if r.parent == database
}
allowed_tables = await datasette.allowed_resources(
"view-table", request.actor, parent=database, include_is_private=True
)
# Create lookup dict for quick access
allowed_dict = {r.child: r for r in allowed_tables}
# Are we a canned query?
canned_query = None
@ -828,9 +818,7 @@ class QueryView(View):
show_hide_text=show_hide_text,
editable=not canned_query,
allow_execute_sql=allow_execute_sql,
tables=await get_tables(
datasette, request, db, allowed_table_set
),
tables=await get_tables(datasette, request, db, allowed_dict),
named_parameter_values=named_parameter_values,
edit_sql_url=edit_sql_url,
display_rows=await display_rows(