Compare commits

...

8 commits

Author SHA1 Message Date
Simon Willison
1d4448fc56
Use subtests in tests/test_docs.py (#2609)
Closes #2608
2025-12-04 21:36:39 -08:00
Simon Willison
2ca00b6c75 Release 1.0a23
Refs #2605, #2599
2025-12-02 19:20:43 -08:00
Simon Willison
03ab359208 tool.uv.package = true 2025-12-02 19:19:48 -08:00
Simon Willison
3eca3ad6d4 Better recipe for 'just docs' 2025-12-02 19:16:39 -08:00
Simon Willison
0a924524be
Split default_permissions.py into a package (#2603)
* Split default_permissions.py into a package, refs #2602

* Remove unused is_resource_allowed() method, improve test coverage

- Remove dead code: is_resource_allowed() method was never called
- Change isinstance check to assertion with error message
- Add test cases for table-level restrictions in restrictions_allow_action()
- Coverage for restrictions.py improved from 79% to 99%

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Additional permission test for gap spotted by coverage
2025-12-02 19:11:31 -08:00
Simon Willison
170b3ff61c Better fix for stale catalog_databases, closes #2606
Refs 2605
2025-12-02 19:00:13 -08:00
Simon Willison
c6c2a238c3 Fix for stale internal database bug, closes #2605 2025-12-02 16:22:42 -08:00
Simon Willison
68f1179bac Fix for text None shown on /-/actions, closes #2599 2025-11-26 17:12:52 -08:00
18 changed files with 1150 additions and 522 deletions

View file

@ -29,7 +29,7 @@ export DATASETTE_SECRET := "not_a_secret"
# Serve live docs on localhost:8000
@docs: cog blacken-docs
uv sync --extra docs && cd docs && uv run make livehtml
uv run --extra docs make -C docs livehtml
# Build docs as static HTML
@docs-build: cog blacken-docs

View file

@ -606,6 +606,15 @@ class Datasette:
"select database_name, schema_version from catalog_databases"
)
}
# Delete stale entries for databases that are no longer attached
stale_databases = set(current_schema_versions.keys()) - set(
self.databases.keys()
)
for stale_db_name in stale_databases:
await internal_db.execute_write(
"DELETE FROM catalog_databases WHERE database_name = ?",
[stale_db_name],
)
for database_name, db in self.databases.items():
schema_version = (await db.execute("PRAGMA schema_version")).first()[0]
# Compare schema versions to see if we should skip it

View file

@ -1,494 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from datasette.app import Datasette
from datasette import hookimpl
from datasette.permissions import PermissionSQL
from datasette.utils import actor_matches_allow
import itsdangerous
import time
@hookimpl(specname="permission_resources_sql")
async def actor_restrictions_sql(datasette, actor, action):
"""Handle actor restriction-based permission rules (_r key)."""
if not actor:
return None
restrictions = actor.get("_r") if isinstance(actor, dict) else None
if restrictions is None:
return []
# Check if this action appears in restrictions (with abbreviations)
action_obj = datasette.actions.get(action)
action_checks = {action}
if action_obj and action_obj.abbr:
action_checks.add(action_obj.abbr)
# Check if globally allowed in restrictions
global_actions = restrictions.get("a", [])
is_globally_allowed = action_checks.intersection(global_actions)
if is_globally_allowed:
# Globally allowed - no restriction filtering needed
return []
# Not globally allowed - build restriction_sql that lists allowlisted resources
restriction_selects = []
restriction_params = {}
param_counter = 0
# Add database-level allowlisted resources
db_restrictions = restrictions.get("d", {})
for db_name, db_actions in db_restrictions.items():
if action_checks.intersection(db_actions):
prefix = f"restr_{param_counter}"
param_counter += 1
restriction_selects.append(
f"SELECT :{prefix}_parent AS parent, NULL AS child"
)
restriction_params[f"{prefix}_parent"] = db_name
# Add table-level allowlisted resources
resource_restrictions = restrictions.get("r", {})
for db_name, tables in resource_restrictions.items():
for table_name, table_actions in tables.items():
if action_checks.intersection(table_actions):
prefix = f"restr_{param_counter}"
param_counter += 1
restriction_selects.append(
f"SELECT :{prefix}_parent AS parent, :{prefix}_child AS child"
)
restriction_params[f"{prefix}_parent"] = db_name
restriction_params[f"{prefix}_child"] = table_name
if not restriction_selects:
# Action not in allowlist - return empty restriction (INTERSECT will return no results)
return [
PermissionSQL(
params={"deny": f"actor restrictions: {action} not in allowlist"},
restriction_sql="SELECT NULL AS parent, NULL AS child WHERE 0", # Empty set
)
]
# Build restriction SQL that returns allowed (parent, child) pairs
restriction_sql = "\nUNION ALL\n".join(restriction_selects)
# Return restriction-only PermissionSQL (sql=None means no permission rules)
# The restriction_sql does the actual filtering via INTERSECT
return [
PermissionSQL(
params=restriction_params,
restriction_sql=restriction_sql,
)
]
@hookimpl(specname="permission_resources_sql")
async def root_user_permissions_sql(datasette, actor, action):
"""Grant root user full permissions when enabled."""
if datasette.root_enabled and actor and actor.get("id") == "root":
# Add a single global-level allow rule (NULL, NULL) for root
# This allows root to access everything by default, but database-level
# and table-level deny rules in config can still block specific resources
return PermissionSQL.allow(reason="root user")
return None
@hookimpl(specname="permission_resources_sql")
async def config_permissions_sql(datasette, actor, action):
"""Apply config-based permission rules from datasette.yaml."""
config = datasette.config or {}
def evaluate(allow_block):
if allow_block is None:
return None
return actor_matches_allow(actor, allow_block)
has_restrictions = actor and "_r" in actor if actor else False
restrictions = actor.get("_r", {}) if actor else {}
action_obj = datasette.actions.get(action)
action_checks = {action}
if action_obj and action_obj.abbr:
action_checks.add(action_obj.abbr)
restricted_databases: set[str] = set()
restricted_tables: set[tuple[str, str]] = set()
if has_restrictions:
restricted_databases = {
db_name
for db_name, db_actions in (restrictions.get("d") or {}).items()
if action_checks.intersection(db_actions)
}
restricted_tables = {
(db_name, table_name)
for db_name, tables in (restrictions.get("r") or {}).items()
for table_name, table_actions in tables.items()
if action_checks.intersection(table_actions)
}
# Tables implicitly reference their parent databases
restricted_databases.update(db for db, _ in restricted_tables)
def is_in_restriction_allowlist(parent, child, action_name):
"""Check if a resource is in the actor's restriction allowlist for this action"""
if not has_restrictions:
return True # No restrictions, all resources allowed
# Check global allowlist
if action_checks.intersection(restrictions.get("a", [])):
return True
# Check database-level allowlist
if parent and action_checks.intersection(
restrictions.get("d", {}).get(parent, [])
):
return True
# Check table-level allowlist
if parent:
table_restrictions = (restrictions.get("r", {}) or {}).get(parent, {})
if child:
table_actions = table_restrictions.get(child, [])
if action_checks.intersection(table_actions):
return True
else:
# Parent query should proceed if any child in this database is allowlisted
for table_actions in table_restrictions.values():
if action_checks.intersection(table_actions):
return True
# Parent/child both None: include if any restrictions exist for this action
if parent is None and child is None:
if action_checks.intersection(restrictions.get("a", [])):
return True
if restricted_databases:
return True
if restricted_tables:
return True
return False
rows = []
def add_row(parent, child, result, scope):
if result is None:
return
rows.append(
(
parent,
child,
bool(result),
f"config {'allow' if result else 'deny'} {scope}",
)
)
def add_row_allow_block(parent, child, allow_block, scope):
"""For 'allow' blocks, always add a row if the block exists - deny if no match"""
if allow_block is None:
return
# If actor has restrictions and this resource is NOT in allowlist, skip this config rule
# Restrictions act as a gating filter - config cannot grant access to restricted-out resources
if not is_in_restriction_allowlist(parent, child, action):
return
result = evaluate(allow_block)
bool_result = bool(result)
# If result is None (no match) or False, treat as deny
rows.append(
(
parent,
child,
bool_result, # None becomes False, False stays False, True stays True
f"config {'allow' if result else 'deny'} {scope}",
)
)
if has_restrictions and not bool_result and child is None:
reason = f"config deny {scope} (restriction gate)"
if parent is None:
# Root-level deny: add more specific denies for restricted resources
if action_obj and action_obj.takes_parent:
for db_name in restricted_databases:
rows.append((db_name, None, 0, reason))
if action_obj and action_obj.takes_child:
for db_name, table_name in restricted_tables:
rows.append((db_name, table_name, 0, reason))
else:
# Database-level deny: add child-level denies for restricted tables
if action_obj and action_obj.takes_child:
for db_name, table_name in restricted_tables:
if db_name == parent:
rows.append((db_name, table_name, 0, reason))
root_perm = (config.get("permissions") or {}).get(action)
add_row(None, None, evaluate(root_perm), f"permissions for {action}")
for db_name, db_config in (config.get("databases") or {}).items():
db_perm = (db_config.get("permissions") or {}).get(action)
add_row(
db_name, None, evaluate(db_perm), f"permissions for {action} on {db_name}"
)
for table_name, table_config in (db_config.get("tables") or {}).items():
table_perm = (table_config.get("permissions") or {}).get(action)
add_row(
db_name,
table_name,
evaluate(table_perm),
f"permissions for {action} on {db_name}/{table_name}",
)
if action == "view-table":
table_allow = (table_config or {}).get("allow")
add_row_allow_block(
db_name,
table_name,
table_allow,
f"allow for {action} on {db_name}/{table_name}",
)
for query_name, query_config in (db_config.get("queries") or {}).items():
# query_config can be a string (just SQL) or a dict (with SQL and options)
if isinstance(query_config, dict):
query_perm = (query_config.get("permissions") or {}).get(action)
add_row(
db_name,
query_name,
evaluate(query_perm),
f"permissions for {action} on {db_name}/{query_name}",
)
if action == "view-query":
query_allow = query_config.get("allow")
add_row_allow_block(
db_name,
query_name,
query_allow,
f"allow for {action} on {db_name}/{query_name}",
)
if action == "view-database":
db_allow = db_config.get("allow")
add_row_allow_block(
db_name, None, db_allow, f"allow for {action} on {db_name}"
)
if action == "execute-sql":
db_allow_sql = db_config.get("allow_sql")
add_row_allow_block(db_name, None, db_allow_sql, f"allow_sql for {db_name}")
if action == "view-table":
# Database-level allow block affects all tables in that database
db_allow = db_config.get("allow")
add_row_allow_block(
db_name, None, db_allow, f"allow for {action} on {db_name}"
)
if action == "view-query":
# Database-level allow block affects all queries in that database
db_allow = db_config.get("allow")
add_row_allow_block(
db_name, None, db_allow, f"allow for {action} on {db_name}"
)
# Root-level allow block applies to all view-* actions
if action == "view-instance":
allow_block = config.get("allow")
add_row_allow_block(None, None, allow_block, "allow for view-instance")
if action == "view-database":
# Root-level allow block also applies to view-database
allow_block = config.get("allow")
add_row_allow_block(None, None, allow_block, "allow for view-database")
if action == "view-table":
# Root-level allow block also applies to view-table
allow_block = config.get("allow")
add_row_allow_block(None, None, allow_block, "allow for view-table")
if action == "view-query":
# Root-level allow block also applies to view-query
allow_block = config.get("allow")
add_row_allow_block(None, None, allow_block, "allow for view-query")
if action == "execute-sql":
allow_sql = config.get("allow_sql")
add_row_allow_block(None, None, allow_sql, "allow_sql")
if not rows:
return []
parts = []
params = {}
for idx, (parent, child, allow, reason) in enumerate(rows):
key = f"cfg_{idx}"
parts.append(
f"SELECT :{key}_parent AS parent, :{key}_child AS child, :{key}_allow AS allow, :{key}_reason AS reason"
)
params[f"{key}_parent"] = parent
params[f"{key}_child"] = child
params[f"{key}_allow"] = 1 if allow else 0
params[f"{key}_reason"] = reason
sql = "\nUNION ALL\n".join(parts)
return [PermissionSQL(sql=sql, params=params)]
@hookimpl(specname="permission_resources_sql")
async def default_allow_sql_check(datasette, actor, action):
"""Enforce default_allow_sql setting for execute-sql action."""
if action == "execute-sql" and not datasette.setting("default_allow_sql"):
return PermissionSQL.deny(reason="default_allow_sql is false")
return None
@hookimpl(specname="permission_resources_sql")
async def default_action_permissions_sql(datasette, actor, action):
"""Apply default allow rules for standard view/execute actions.
With the INTERSECT-based restriction approach, these defaults are always generated
and then filtered by restriction_sql if the actor has restrictions.
"""
# Skip default allow rules if default_deny is enabled
if datasette.default_deny:
return None
default_allow_actions = {
"view-instance",
"view-database",
"view-database-download",
"view-table",
"view-query",
"execute-sql",
}
if action in default_allow_actions:
reason = f"default allow for {action}".replace("'", "''")
return PermissionSQL.allow(reason=reason)
return None
def restrictions_allow_action(
datasette: "Datasette",
restrictions: dict,
action: str,
resource: str | tuple[str, str],
):
"""
Check if actor restrictions allow the requested action against the requested resource.
Restrictions work on an exact-match basis: if an actor has view-table permission,
they can view tables, but NOT automatically view-instance or view-database.
Each permission is checked independently without implication logic.
"""
# Does this action have an abbreviation?
to_check = {action}
action_obj = datasette.actions.get(action)
if action_obj and action_obj.abbr:
to_check.add(action_obj.abbr)
# Check if restrictions explicitly allow this action
# Restrictions can be at three levels:
# - "a": global (any resource)
# - "d": per-database
# - "r": per-table/resource
# Check global level (any resource)
all_allowed = restrictions.get("a")
if all_allowed is not None:
assert isinstance(all_allowed, list)
if to_check.intersection(all_allowed):
return True
# Check database level
if resource:
if isinstance(resource, str):
database_name = resource
else:
database_name = resource[0]
database_allowed = restrictions.get("d", {}).get(database_name)
if database_allowed is not None:
assert isinstance(database_allowed, list)
if to_check.intersection(database_allowed):
return True
# Check table/resource level
if resource is not None and not isinstance(resource, str) and len(resource) == 2:
database, table = resource
table_allowed = restrictions.get("r", {}).get(database, {}).get(table)
if table_allowed is not None:
assert isinstance(table_allowed, list)
if to_check.intersection(table_allowed):
return True
# This action is not explicitly allowed, so reject it
return False
@hookimpl
def actor_from_request(datasette, request):
prefix = "dstok_"
if not datasette.setting("allow_signed_tokens"):
return None
max_signed_tokens_ttl = datasette.setting("max_signed_tokens_ttl")
authorization = request.headers.get("authorization")
if not authorization:
return None
if not authorization.startswith("Bearer "):
return None
token = authorization[len("Bearer ") :]
if not token.startswith(prefix):
return None
token = token[len(prefix) :]
try:
decoded = datasette.unsign(token, namespace="token")
except itsdangerous.BadSignature:
return None
if "t" not in decoded:
# Missing timestamp
return None
created = decoded["t"]
if not isinstance(created, int):
# Invalid timestamp
return None
duration = decoded.get("d")
if duration is not None and not isinstance(duration, int):
# Invalid duration
return None
if (duration is None and max_signed_tokens_ttl) or (
duration is not None
and max_signed_tokens_ttl
and duration > max_signed_tokens_ttl
):
duration = max_signed_tokens_ttl
if duration:
if time.time() - created > duration:
# Expired
return None
actor = {"id": decoded["a"], "token": "dstok"}
if "_r" in decoded:
actor["_r"] = decoded["_r"]
if duration:
actor["token_expires"] = created + duration
return actor
@hookimpl
def skip_csrf(scope):
# Skip CSRF check for requests with content-type: application/json
if scope["type"] == "http":
headers = scope.get("headers") or {}
if dict(headers).get(b"content-type") == b"application/json":
return True
@hookimpl
def canned_queries(datasette, database, actor):
"""Return canned queries from datasette configuration."""
queries = (
((datasette.config or {}).get("databases") or {}).get(database) or {}
).get("queries") or {}
return queries

View file

@ -0,0 +1,59 @@
"""
Default permission implementations for Datasette.
This module provides the built-in permission checking logic through implementations
of the permission_resources_sql hook. The hooks are organized by their purpose:
1. Actor Restrictions - Enforces _r allowlists embedded in actor tokens
2. Root User - Grants full access when --root flag is used
3. Config Rules - Applies permissions from datasette.yaml
4. Default Settings - Enforces default_allow_sql and default view permissions
IMPORTANT: These hooks return PermissionSQL objects that are combined using SQL
UNION/INTERSECT operations. The order of evaluation is:
- restriction_sql fields are INTERSECTed (all must match)
- Regular sql fields are UNIONed and evaluated with cascading priority
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from datasette.app import Datasette
from datasette import hookimpl
# Re-export all hooks and public utilities
from .restrictions import (
actor_restrictions_sql,
restrictions_allow_action,
ActorRestrictions,
)
from .root import root_user_permissions_sql
from .config import config_permissions_sql
from .defaults import (
default_allow_sql_check,
default_action_permissions_sql,
DEFAULT_ALLOW_ACTIONS,
)
from .tokens import actor_from_signed_api_token
@hookimpl
def skip_csrf(scope) -> Optional[bool]:
"""Skip CSRF check for JSON content-type requests."""
if scope["type"] == "http":
headers = scope.get("headers") or {}
if dict(headers).get(b"content-type") == b"application/json":
return True
return None
@hookimpl
def canned_queries(datasette: "Datasette", database: str, actor) -> dict:
"""Return canned queries defined in datasette.yaml configuration."""
queries = (
((datasette.config or {}).get("databases") or {}).get(database) or {}
).get("queries") or {}
return queries

View file

@ -0,0 +1,442 @@
"""
Config-based permission handling for Datasette.
Applies permission rules from datasette.yaml configuration.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, List, Optional, Set, Tuple
if TYPE_CHECKING:
from datasette.app import Datasette
from datasette import hookimpl
from datasette.permissions import PermissionSQL
from datasette.utils import actor_matches_allow
from .helpers import PermissionRowCollector, get_action_name_variants
class ConfigPermissionProcessor:
"""
Processes permission rules from datasette.yaml configuration.
Configuration structure:
permissions: # Root-level permissions block
view-instance:
id: admin
databases:
mydb:
permissions: # Database-level permissions
view-database:
id: admin
allow: # Database-level allow block (for view-*)
id: viewer
allow_sql: # execute-sql allow block
id: analyst
tables:
users:
permissions: # Table-level permissions
view-table:
id: admin
allow: # Table-level allow block
id: viewer
queries:
my_query:
permissions: # Query-level permissions
view-query:
id: admin
allow: # Query-level allow block
id: viewer
"""
def __init__(
self,
datasette: "Datasette",
actor: Optional[dict],
action: str,
):
self.datasette = datasette
self.actor = actor
self.action = action
self.config = datasette.config or {}
self.collector = PermissionRowCollector(prefix="cfg")
# Pre-compute action variants
self.action_checks = get_action_name_variants(datasette, action)
self.action_obj = datasette.actions.get(action)
# Parse restrictions if present
self.has_restrictions = actor and "_r" in actor if actor else False
self.restrictions = actor.get("_r", {}) if actor else {}
# Pre-compute restriction info for efficiency
self.restricted_databases: Set[str] = set()
self.restricted_tables: Set[Tuple[str, str]] = set()
if self.has_restrictions:
self.restricted_databases = {
db_name
for db_name, db_actions in (self.restrictions.get("d") or {}).items()
if self.action_checks.intersection(db_actions)
}
self.restricted_tables = {
(db_name, table_name)
for db_name, tables in (self.restrictions.get("r") or {}).items()
for table_name, table_actions in tables.items()
if self.action_checks.intersection(table_actions)
}
# Tables implicitly reference their parent databases
self.restricted_databases.update(db for db, _ in self.restricted_tables)
def evaluate_allow_block(self, allow_block: Any) -> Optional[bool]:
"""Evaluate an allow block against the current actor."""
if allow_block is None:
return None
return actor_matches_allow(self.actor, allow_block)
def is_in_restriction_allowlist(
self,
parent: Optional[str],
child: Optional[str],
) -> bool:
"""Check if resource is allowed by actor restrictions."""
if not self.has_restrictions:
return True # No restrictions, all resources allowed
# Check global allowlist
if self.action_checks.intersection(self.restrictions.get("a", [])):
return True
# Check database-level allowlist
if parent and self.action_checks.intersection(
self.restrictions.get("d", {}).get(parent, [])
):
return True
# Check table-level allowlist
if parent:
table_restrictions = (self.restrictions.get("r", {}) or {}).get(parent, {})
if child:
table_actions = table_restrictions.get(child, [])
if self.action_checks.intersection(table_actions):
return True
else:
# Parent query should proceed if any child in this database is allowlisted
for table_actions in table_restrictions.values():
if self.action_checks.intersection(table_actions):
return True
# Parent/child both None: include if any restrictions exist for this action
if parent is None and child is None:
if self.action_checks.intersection(self.restrictions.get("a", [])):
return True
if self.restricted_databases:
return True
if self.restricted_tables:
return True
return False
def add_permissions_rule(
self,
parent: Optional[str],
child: Optional[str],
permissions_block: Optional[dict],
scope_desc: str,
) -> None:
"""Add a rule from a permissions:{action} block."""
if permissions_block is None:
return
action_allow_block = permissions_block.get(self.action)
result = self.evaluate_allow_block(action_allow_block)
self.collector.add(
parent=parent,
child=child,
allow=result,
reason=f"config {'allow' if result else 'deny'} {scope_desc}",
if_not_none=True,
)
def add_allow_block_rule(
self,
parent: Optional[str],
child: Optional[str],
allow_block: Any,
scope_desc: str,
) -> None:
"""
Add rules from an allow:{} block.
For allow blocks, if the block exists but doesn't match the actor,
this is treated as a deny. We also handle the restriction-gate logic.
"""
if allow_block is None:
return
# Skip if resource is not in restriction allowlist
if not self.is_in_restriction_allowlist(parent, child):
return
result = self.evaluate_allow_block(allow_block)
bool_result = bool(result)
self.collector.add(
parent,
child,
bool_result,
f"config {'allow' if result else 'deny'} {scope_desc}",
)
# Handle restriction-gate: add explicit denies for restricted resources
self._add_restriction_gate_denies(parent, child, bool_result, scope_desc)
def _add_restriction_gate_denies(
self,
parent: Optional[str],
child: Optional[str],
is_allowed: bool,
scope_desc: str,
) -> None:
"""
When a config rule denies at a higher level, add explicit denies
for restricted resources to prevent child-level allows from
incorrectly granting access.
"""
if is_allowed or child is not None or not self.has_restrictions:
return
if not self.action_obj:
return
reason = f"config deny {scope_desc} (restriction gate)"
if parent is None:
# Root-level deny: add denies for all restricted resources
if self.action_obj.takes_parent:
for db_name in self.restricted_databases:
self.collector.add(db_name, None, False, reason)
if self.action_obj.takes_child:
for db_name, table_name in self.restricted_tables:
self.collector.add(db_name, table_name, False, reason)
else:
# Database-level deny: add denies for tables in that database
if self.action_obj.takes_child:
for db_name, table_name in self.restricted_tables:
if db_name == parent:
self.collector.add(db_name, table_name, False, reason)
def process(self) -> Optional[PermissionSQL]:
"""Process all config rules and return combined PermissionSQL."""
self._process_root_permissions()
self._process_databases()
self._process_root_allow_blocks()
return self.collector.to_permission_sql()
def _process_root_permissions(self) -> None:
"""Process root-level permissions block."""
root_perms = self.config.get("permissions") or {}
self.add_permissions_rule(
None,
None,
root_perms,
f"permissions for {self.action}",
)
def _process_databases(self) -> None:
"""Process database-level and nested configurations."""
databases = self.config.get("databases") or {}
for db_name, db_config in databases.items():
self._process_database(db_name, db_config or {})
def _process_database(self, db_name: str, db_config: dict) -> None:
"""Process a single database's configuration."""
# Database-level permissions block
db_perms = db_config.get("permissions") or {}
self.add_permissions_rule(
db_name,
None,
db_perms,
f"permissions for {self.action} on {db_name}",
)
# Process tables
for table_name, table_config in (db_config.get("tables") or {}).items():
self._process_table(db_name, table_name, table_config or {})
# Process queries
for query_name, query_config in (db_config.get("queries") or {}).items():
self._process_query(db_name, query_name, query_config)
# Database-level allow blocks
self._process_database_allow_blocks(db_name, db_config)
def _process_table(
self,
db_name: str,
table_name: str,
table_config: dict,
) -> None:
"""Process a single table's configuration."""
# Table-level permissions block
table_perms = table_config.get("permissions") or {}
self.add_permissions_rule(
db_name,
table_name,
table_perms,
f"permissions for {self.action} on {db_name}/{table_name}",
)
# Table-level allow block (for view-table)
if self.action == "view-table":
self.add_allow_block_rule(
db_name,
table_name,
table_config.get("allow"),
f"allow for {self.action} on {db_name}/{table_name}",
)
def _process_query(
self,
db_name: str,
query_name: str,
query_config: Any,
) -> None:
"""Process a single query's configuration."""
# Query config can be a string (just SQL) or dict
if not isinstance(query_config, dict):
return
# Query-level permissions block
query_perms = query_config.get("permissions") or {}
self.add_permissions_rule(
db_name,
query_name,
query_perms,
f"permissions for {self.action} on {db_name}/{query_name}",
)
# Query-level allow block (for view-query)
if self.action == "view-query":
self.add_allow_block_rule(
db_name,
query_name,
query_config.get("allow"),
f"allow for {self.action} on {db_name}/{query_name}",
)
def _process_database_allow_blocks(
self,
db_name: str,
db_config: dict,
) -> None:
"""Process database-level allow/allow_sql blocks."""
# view-database allow block
if self.action == "view-database":
self.add_allow_block_rule(
db_name,
None,
db_config.get("allow"),
f"allow for {self.action} on {db_name}",
)
# execute-sql allow_sql block
if self.action == "execute-sql":
self.add_allow_block_rule(
db_name,
None,
db_config.get("allow_sql"),
f"allow_sql for {db_name}",
)
# view-table uses database-level allow for inheritance
if self.action == "view-table":
self.add_allow_block_rule(
db_name,
None,
db_config.get("allow"),
f"allow for {self.action} on {db_name}",
)
# view-query uses database-level allow for inheritance
if self.action == "view-query":
self.add_allow_block_rule(
db_name,
None,
db_config.get("allow"),
f"allow for {self.action} on {db_name}",
)
def _process_root_allow_blocks(self) -> None:
"""Process root-level allow/allow_sql blocks."""
root_allow = self.config.get("allow")
if self.action == "view-instance":
self.add_allow_block_rule(
None,
None,
root_allow,
"allow for view-instance",
)
if self.action == "view-database":
self.add_allow_block_rule(
None,
None,
root_allow,
"allow for view-database",
)
if self.action == "view-table":
self.add_allow_block_rule(
None,
None,
root_allow,
"allow for view-table",
)
if self.action == "view-query":
self.add_allow_block_rule(
None,
None,
root_allow,
"allow for view-query",
)
if self.action == "execute-sql":
self.add_allow_block_rule(
None,
None,
self.config.get("allow_sql"),
"allow_sql",
)
@hookimpl(specname="permission_resources_sql")
async def config_permissions_sql(
datasette: "Datasette",
actor: Optional[dict],
action: str,
) -> Optional[List[PermissionSQL]]:
"""
Apply permission rules from datasette.yaml configuration.
This processes:
- permissions: blocks at root, database, table, and query levels
- allow: blocks for view-* actions
- allow_sql: blocks for execute-sql action
"""
processor = ConfigPermissionProcessor(datasette, actor, action)
result = processor.process()
if result is None:
return []
return [result]

View file

@ -0,0 +1,70 @@
"""
Default permission settings for Datasette.
Provides default allow rules for standard view/execute actions.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from datasette.app import Datasette
from datasette import hookimpl
from datasette.permissions import PermissionSQL
# Actions that are allowed by default (unless --default-deny is used)
DEFAULT_ALLOW_ACTIONS = frozenset(
{
"view-instance",
"view-database",
"view-database-download",
"view-table",
"view-query",
"execute-sql",
}
)
@hookimpl(specname="permission_resources_sql")
async def default_allow_sql_check(
datasette: "Datasette",
actor: Optional[dict],
action: str,
) -> Optional[PermissionSQL]:
"""
Enforce the default_allow_sql setting.
When default_allow_sql is false (the default), execute-sql is denied
unless explicitly allowed by config or other rules.
"""
if action == "execute-sql":
if not datasette.setting("default_allow_sql"):
return PermissionSQL.deny(reason="default_allow_sql is false")
return None
@hookimpl(specname="permission_resources_sql")
async def default_action_permissions_sql(
datasette: "Datasette",
actor: Optional[dict],
action: str,
) -> Optional[PermissionSQL]:
"""
Provide default allow rules for standard view/execute actions.
These defaults are skipped when datasette is started with --default-deny.
The restriction_sql mechanism (from actor_restrictions_sql) will still
filter these results if the actor has restrictions.
"""
if datasette.default_deny:
return None
if action in DEFAULT_ALLOW_ACTIONS:
reason = f"default allow for {action}".replace("'", "''")
return PermissionSQL.allow(reason=reason)
return None

View file

@ -0,0 +1,85 @@
"""
Shared helper utilities for default permission implementations.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, List, Optional, Set
if TYPE_CHECKING:
from datasette.app import Datasette
from datasette.permissions import PermissionSQL
def get_action_name_variants(datasette: "Datasette", action: str) -> Set[str]:
"""
Get all name variants for an action (full name and abbreviation).
Example:
get_action_name_variants(ds, "view-table") -> {"view-table", "vt"}
"""
variants = {action}
action_obj = datasette.actions.get(action)
if action_obj and action_obj.abbr:
variants.add(action_obj.abbr)
return variants
def action_in_list(datasette: "Datasette", action: str, action_list: list) -> bool:
"""Check if an action (or its abbreviation) is in a list."""
return bool(get_action_name_variants(datasette, action).intersection(action_list))
@dataclass
class PermissionRow:
"""A single permission rule row."""
parent: Optional[str]
child: Optional[str]
allow: bool
reason: str
class PermissionRowCollector:
"""Collects permission rows and converts them to PermissionSQL."""
def __init__(self, prefix: str = "row"):
self.rows: List[PermissionRow] = []
self.prefix = prefix
def add(
self,
parent: Optional[str],
child: Optional[str],
allow: Optional[bool],
reason: str,
if_not_none: bool = False,
) -> None:
"""Add a permission row. If if_not_none=True, only add if allow is not None."""
if if_not_none and allow is None:
return
self.rows.append(PermissionRow(parent, child, allow, reason))
def to_permission_sql(self) -> Optional[PermissionSQL]:
"""Convert collected rows to a PermissionSQL object."""
if not self.rows:
return None
parts = []
params = {}
for idx, row in enumerate(self.rows):
key = f"{self.prefix}_{idx}"
parts.append(
f"SELECT :{key}_parent AS parent, :{key}_child AS child, "
f":{key}_allow AS allow, :{key}_reason AS reason"
)
params[f"{key}_parent"] = row.parent
params[f"{key}_child"] = row.child
params[f"{key}_allow"] = 1 if row.allow else 0
params[f"{key}_reason"] = row.reason
sql = "\nUNION ALL\n".join(parts)
return PermissionSQL(sql=sql, params=params)

View file

@ -0,0 +1,195 @@
"""
Actor restriction handling for Datasette permissions.
This module handles the _r (restrictions) key in actor dictionaries, which
contains allowlists of resources the actor can access.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, List, Optional, Set, Tuple
if TYPE_CHECKING:
from datasette.app import Datasette
from datasette import hookimpl
from datasette.permissions import PermissionSQL
from .helpers import action_in_list, get_action_name_variants
@dataclass
class ActorRestrictions:
"""Parsed actor restrictions from the _r key."""
global_actions: List[str] # _r.a - globally allowed actions
database_actions: dict # _r.d - {db_name: [actions]}
table_actions: dict # _r.r - {db_name: {table: [actions]}}
@classmethod
def from_actor(cls, actor: Optional[dict]) -> Optional["ActorRestrictions"]:
"""Parse restrictions from actor dict. Returns None if no restrictions."""
if not actor:
return None
assert isinstance(actor, dict), "actor must be a dictionary"
restrictions = actor.get("_r")
if restrictions is None:
return None
return cls(
global_actions=restrictions.get("a", []),
database_actions=restrictions.get("d", {}),
table_actions=restrictions.get("r", {}),
)
def is_action_globally_allowed(self, datasette: "Datasette", action: str) -> bool:
"""Check if action is in the global allowlist."""
return action_in_list(datasette, action, self.global_actions)
def get_allowed_databases(self, datasette: "Datasette", action: str) -> Set[str]:
"""Get database names where this action is allowed."""
allowed = set()
for db_name, db_actions in self.database_actions.items():
if action_in_list(datasette, action, db_actions):
allowed.add(db_name)
return allowed
def get_allowed_tables(
self, datasette: "Datasette", action: str
) -> Set[Tuple[str, str]]:
"""Get (database, table) pairs where this action is allowed."""
allowed = set()
for db_name, tables in self.table_actions.items():
for table_name, table_actions in tables.items():
if action_in_list(datasette, action, table_actions):
allowed.add((db_name, table_name))
return allowed
@hookimpl(specname="permission_resources_sql")
async def actor_restrictions_sql(
datasette: "Datasette",
actor: Optional[dict],
action: str,
) -> Optional[List[PermissionSQL]]:
"""
Handle actor restriction-based permission rules.
When an actor has an "_r" key, it contains an allowlist of resources they
can access. This function returns restriction_sql that filters the final
results to only include resources in that allowlist.
The _r structure:
{
"a": ["vi", "pd"], # Global actions allowed
"d": {"mydb": ["vt", "es"]}, # Database-level actions
"r": {"mydb": {"users": ["vt"]}} # Table-level actions
}
"""
if not actor:
return None
restrictions = ActorRestrictions.from_actor(actor)
if restrictions is None:
# No restrictions - all resources allowed
return []
# If globally allowed, no filtering needed
if restrictions.is_action_globally_allowed(datasette, action):
return []
# Build restriction SQL
allowed_dbs = restrictions.get_allowed_databases(datasette, action)
allowed_tables = restrictions.get_allowed_tables(datasette, action)
# If nothing is allowed for this action, return empty-set restriction
if not allowed_dbs and not allowed_tables:
return [
PermissionSQL(
params={"deny": f"actor restrictions: {action} not in allowlist"},
restriction_sql="SELECT NULL AS parent, NULL AS child WHERE 0",
)
]
# Build UNION of allowed resources
selects = []
params = {}
counter = 0
# Database-level entries (parent, NULL) - allows all children
for db_name in allowed_dbs:
key = f"restr_{counter}"
counter += 1
selects.append(f"SELECT :{key}_parent AS parent, NULL AS child")
params[f"{key}_parent"] = db_name
# Table-level entries (parent, child)
for db_name, table_name in allowed_tables:
key = f"restr_{counter}"
counter += 1
selects.append(f"SELECT :{key}_parent AS parent, :{key}_child AS child")
params[f"{key}_parent"] = db_name
params[f"{key}_child"] = table_name
restriction_sql = "\nUNION ALL\n".join(selects)
return [PermissionSQL(params=params, restriction_sql=restriction_sql)]
def restrictions_allow_action(
datasette: "Datasette",
restrictions: dict,
action: str,
resource: Optional[str | Tuple[str, str]],
) -> bool:
"""
Check if restrictions allow the requested action on the requested resource.
This is a synchronous utility function for use by other code that needs
to quickly check restriction allowlists.
Args:
datasette: The Datasette instance
restrictions: The _r dict from an actor
action: The action name to check
resource: None for global, str for database, (db, table) tuple for table
Returns:
True if allowed, False if denied
"""
# Does this action have an abbreviation?
to_check = get_action_name_variants(datasette, action)
# Check global level (any resource)
all_allowed = restrictions.get("a")
if all_allowed is not None:
assert isinstance(all_allowed, list)
if to_check.intersection(all_allowed):
return True
# Check database level
if resource:
if isinstance(resource, str):
database_name = resource
else:
database_name = resource[0]
database_allowed = restrictions.get("d", {}).get(database_name)
if database_allowed is not None:
assert isinstance(database_allowed, list)
if to_check.intersection(database_allowed):
return True
# Check table/resource level
if resource is not None and not isinstance(resource, str) and len(resource) == 2:
database, table = resource
table_allowed = restrictions.get("r", {}).get(database, {}).get(table)
if table_allowed is not None:
assert isinstance(table_allowed, list)
if to_check.intersection(table_allowed):
return True
# This action is not explicitly allowed, so reject it
return False

View file

@ -0,0 +1,29 @@
"""
Root user permission handling for Datasette.
Grants full permissions to the root user when --root flag is used.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from datasette.app import Datasette
from datasette import hookimpl
from datasette.permissions import PermissionSQL
@hookimpl(specname="permission_resources_sql")
async def root_user_permissions_sql(
datasette: "Datasette",
actor: Optional[dict],
) -> Optional[PermissionSQL]:
"""
Grant root user full permissions when --root flag is used.
"""
if not datasette.root_enabled:
return None
if actor is not None and actor.get("id") == "root":
return PermissionSQL.allow(reason="root user")

View file

@ -0,0 +1,95 @@
"""
Token authentication for Datasette.
Handles signed API tokens (dstok_ prefix).
"""
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from datasette.app import Datasette
import itsdangerous
from datasette import hookimpl
@hookimpl(specname="actor_from_request")
def actor_from_signed_api_token(datasette: "Datasette", request) -> Optional[dict]:
"""
Authenticate requests using signed API tokens (dstok_ prefix).
Token structure (signed JSON):
{
"a": "actor_id", # Actor ID
"t": 1234567890, # Timestamp (Unix epoch)
"d": 3600, # Optional: Duration in seconds
"_r": {...} # Optional: Restrictions
}
"""
prefix = "dstok_"
# Check if tokens are enabled
if not datasette.setting("allow_signed_tokens"):
return None
max_signed_tokens_ttl = datasette.setting("max_signed_tokens_ttl")
# Get authorization header
authorization = request.headers.get("authorization")
if not authorization:
return None
if not authorization.startswith("Bearer "):
return None
token = authorization[len("Bearer ") :]
if not token.startswith(prefix):
return None
# Remove prefix and verify signature
token = token[len(prefix) :]
try:
decoded = datasette.unsign(token, namespace="token")
except itsdangerous.BadSignature:
return None
# Validate timestamp
if "t" not in decoded:
return None
created = decoded["t"]
if not isinstance(created, int):
return None
# Handle duration/expiry
duration = decoded.get("d")
if duration is not None and not isinstance(duration, int):
return None
# Apply max TTL if configured
if (duration is None and max_signed_tokens_ttl) or (
duration is not None
and max_signed_tokens_ttl
and duration > max_signed_tokens_ttl
):
duration = max_signed_tokens_ttl
# Check expiry
if duration:
if time.time() - created > duration:
return None
# Build actor dict
actor = {"id": decoded["a"], "token": "dstok"}
# Copy restrictions if present
if "_r" in decoded:
actor["_r"] = decoded["_r"]
# Add expiry timestamp if applicable
if duration:
actor["token_expires"] = created + duration
return actor

View file

@ -31,7 +31,7 @@
<td><strong>{{ action.name }}</strong></td>
<td>{% if action.abbr %}<code>{{ action.abbr }}</code>{% endif %}</td>
<td>{{ action.description or "" }}</td>
<td><code>{{ action.resource_class }}</code></td>
<td>{% if action.resource_class %}<code>{{ action.resource_class }}</code>{% endif %}</td>
<td>{% if action.takes_parent %}✓{% endif %}</td>
<td>{% if action.takes_child %}✓{% endif %}</td>
<td>{% if action.also_requires %}<code>{{ action.also_requires }}</code>{% endif %}</td>

View file

@ -1,2 +1,2 @@
__version__ = "1.0a22"
__version__ = "1.0a23"
__version_info__ = tuple(__version__.split("."))

View file

@ -4,6 +4,14 @@
Changelog
=========
.. _v1_0_a23:
1.0a23 (2025-12-02)
-------------------
- Fix for bug where a stale database entry in ``internal.db`` could cause a 500 error on the homepage. (:issue:`2605`)
- Cosmetic improvement to ``/-/actions`` page. (:issue:`2599`)
.. _v1_0_a22:
1.0a22 (2025-11-13)

View file

@ -69,7 +69,7 @@ docs = [
"ruamel.yaml",
]
test = [
"pytest>=5.2.2",
"pytest>=9",
"pytest-xdist>=2.2.1",
"pytest-asyncio>=1.2.0",
"beautifulsoup4>=4.8.1",
@ -93,3 +93,6 @@ datasette = ["templates/*.html"]
[tool.setuptools.dynamic]
version = {attr = "datasette.version.__version__"}
[tool.uv]
package = true

View file

@ -28,9 +28,10 @@ def settings_headings():
return get_headings((docs_path / "settings.rst").read_text(), "~")
@pytest.mark.parametrize("setting", app.SETTINGS)
def test_settings_are_documented(settings_headings, setting):
assert setting.name in settings_headings
def test_settings_are_documented(settings_headings, subtests):
for setting in app.SETTINGS:
with subtests.test(setting=setting.name):
assert setting.name in settings_headings
@pytest.fixture(scope="session")
@ -38,21 +39,21 @@ def plugin_hooks_content():
return (docs_path / "plugin_hooks.rst").read_text()
@pytest.mark.parametrize(
"plugin", [name for name in dir(app.pm.hook) if not name.startswith("_")]
)
def test_plugin_hooks_are_documented(plugin, plugin_hooks_content):
def test_plugin_hooks_are_documented(plugin_hooks_content, subtests):
headings = set()
headings.update(get_headings(plugin_hooks_content, "-"))
headings.update(get_headings(plugin_hooks_content, "~"))
assert plugin in headings
hook_caller = getattr(app.pm.hook, plugin)
arg_names = [a for a in hook_caller.spec.argnames if a != "__multicall__"]
# Check for plugin_name(arg1, arg2, arg3)
expected = f"{plugin}({', '.join(arg_names)})"
assert (
expected in plugin_hooks_content
), f"Missing from plugin hook documentation: {expected}"
plugins = [name for name in dir(app.pm.hook) if not name.startswith("_")]
for plugin in plugins:
with subtests.test(plugin=plugin):
assert plugin in headings
hook_caller = getattr(app.pm.hook, plugin)
arg_names = [a for a in hook_caller.spec.argnames if a != "__multicall__"]
# Check for plugin_name(arg1, arg2, arg3)
expected = f"{plugin}({', '.join(arg_names)})"
assert (
expected in plugin_hooks_content
), f"Missing from plugin hook documentation: {expected}"
@pytest.fixture(scope="session")
@ -68,9 +69,11 @@ def documented_views():
return view_labels
@pytest.mark.parametrize("view_class", [v for v in dir(app) if v.endswith("View")])
def test_view_classes_are_documented(documented_views, view_class):
assert view_class in documented_views
def test_view_classes_are_documented(documented_views, subtests):
view_classes = [v for v in dir(app) if v.endswith("View")]
for view_class in view_classes:
with subtests.test(view_class=view_class):
assert view_class in documented_views
@pytest.fixture(scope="session")
@ -85,9 +88,10 @@ def documented_table_filters():
}
@pytest.mark.parametrize("filter", [f.key for f in Filters._filters])
def test_table_filters_are_documented(documented_table_filters, filter):
assert filter in documented_table_filters
def test_table_filters_are_documented(documented_table_filters, subtests):
for f in Filters._filters:
with subtests.test(filter=f.key):
assert f.key in documented_table_filters
@pytest.fixture(scope="session")
@ -101,9 +105,10 @@ def documented_fns():
}
@pytest.mark.parametrize("fn", utils.functions_marked_as_documented)
def test_functions_marked_with_documented_are_documented(documented_fns, fn):
assert fn.__name__ in documented_fns
def test_functions_marked_with_documented_are_documented(documented_fns, subtests):
for fn in utils.functions_marked_as_documented:
with subtests.test(fn=fn.__name__):
assert fn.__name__ in documented_fns
def test_rst_heading_underlines_match_title_length():

View file

@ -1194,6 +1194,21 @@ async def test_actions_page(ds_client):
ds_client.ds.root_enabled = original_root_enabled
@pytest.mark.asyncio
async def test_actions_page_does_not_display_none_string(ds_client):
"""Ensure the Resource column doesn't display the string 'None' for null values."""
# https://github.com/simonw/datasette/issues/2599
original_root_enabled = ds_client.ds.root_enabled
try:
ds_client.ds.root_enabled = True
cookies = {"ds_actor": ds_client.actor_cookie({"id": "root"})}
response = await ds_client.get("/-/actions", cookies=cookies)
assert response.status_code == 200
assert "<code>None</code>" not in response.text
finally:
ds_client.ds.root_enabled = original_root_enabled
@pytest.mark.asyncio
async def test_permission_debug_tabs_with_query_string(ds_client):
"""Test that navigation tabs persist query strings across Check, Allowed, and Rules pages"""

View file

@ -91,3 +91,51 @@ async def test_internal_foreign_key_references(ds_client):
)
await internal_db.execute_fn(inner)
@pytest.mark.asyncio
async def test_stale_catalog_entry_database_fix(tmp_path):
"""
Test for https://github.com/simonw/datasette/issues/2605
When the internal database persists across restarts and has entries in
catalog_databases for databases that no longer exist, accessing the
index page should not cause a 500 error (KeyError).
"""
from datasette.app import Datasette
internal_db_path = str(tmp_path / "internal.db")
data_db_path = str(tmp_path / "data.db")
# Create a data database file
import sqlite3
conn = sqlite3.connect(data_db_path)
conn.execute("CREATE TABLE test_table (id INTEGER PRIMARY KEY)")
conn.close()
# First Datasette instance: with the data database and persistent internal db
ds1 = Datasette(files=[data_db_path], internal=internal_db_path)
await ds1.invoke_startup()
# Access the index page to populate the internal catalog
response = await ds1.client.get("/")
assert "data" in ds1.databases
assert response.status_code == 200
# Second Datasette instance: reusing internal.db but WITHOUT the data database
# This simulates restarting Datasette after removing a database
ds2 = Datasette(internal=internal_db_path)
await ds2.invoke_startup()
# The database is not in ds2.databases
assert "data" not in ds2.databases
# Accessing the index page should NOT cause a 500 error
# This is the bug: it currently raises KeyError when trying to
# access ds.databases["data"] for the stale catalog entry
response = await ds2.client.get("/")
assert response.status_code == 200, (
f"Index page should return 200, not {response.status_code}. "
"This fails due to stale catalog entries causing KeyError."
)

View file

@ -1323,6 +1323,20 @@ async def test_actor_restrictions(
("dbname2", "tablename"),
False,
),
# Table-level restriction allows access to that specific table
(
{"r": {"dbname": {"tablename": ["view-table"]}}},
"view-table",
("dbname", "tablename"),
True,
),
# But not to a different table in the same database
(
{"r": {"dbname": {"tablename": ["view-table"]}}},
"view-table",
("dbname", "other_table"),
False,
),
),
)
async def test_restrictions_allow_action(restrictions, action, resource, expected):
@ -1653,3 +1667,48 @@ async def test_permission_check_view_requires_debug_permission():
data = response.json()
assert data["action"] == "view-instance"
assert data["allowed"] is True
@pytest.mark.asyncio
async def test_root_allow_block_with_table_restricted_actor():
"""
Test that root-level allow: blocks are processed for actors with
table-level restrictions.
This covers the case in config.py is_in_restriction_allowlist() where
parent=None, child=None and actor has table restrictions but not global.
"""
from datasette.resources import TableResource
# Config with root-level allow block that denies non-admin users
ds = Datasette(
config={
"allow": {"id": "admin"}, # Root-level allow block
}
)
await ds.invoke_startup()
db = ds.add_memory_database("mydb")
await db.execute_write("create table t1 (id integer primary key)")
await ds.client.get("/") # Trigger catalog refresh
# Actor with table-level restrictions only (not global)
actor = {"id": "user", "_r": {"r": {"mydb": {"t1": ["view-table"]}}}}
# The root-level allow: {id: admin} should be processed and deny this user
# because they're not "admin", even though they have table restrictions
result = await ds.allowed(
action="view-table",
resource=TableResource("mydb", "t1"),
actor=actor,
)
# Should be False because root allow: {id: admin} denies non-admin users
assert result is False
# But admin with same restrictions should be allowed
admin_actor = {"id": "admin", "_r": {"r": {"mydb": {"t1": ["view-table"]}}}}
result = await ds.allowed(
action="view-table",
resource=TableResource("mydb", "t1"),
actor=admin_actor,
)
assert result is True