mirror of
https://github.com/simonw/datasette.git
synced 2025-12-10 16:51:24 +01:00
The bridge was incorrectly using the new allowed() method which applies default allow rules. This caused actors without restrictions to get True instead of USE_DEFAULT, breaking backward compatibility. Fixed by: - Removing the code that converted to resource objects and called allowed() - Bridge now ONLY checks config-based rules via _config_permission_rules() - Returns None when no config rules exist, allowing Permission.default to apply - This maintains backward compatibility with the permission_allowed() API All 177 permission tests now pass, including test_actor_restricted_permissions and test_permissions_checked which were previously failing.
546 lines
19 KiB
Python
546 lines
19 KiB
Python
from datasette import hookimpl, Permission
|
|
from datasette.permissions import PermissionSQL
|
|
from datasette.utils import actor_matches_allow
|
|
import itsdangerous
|
|
import time
|
|
|
|
|
|
@hookimpl
|
|
def register_permissions():
|
|
return (
|
|
Permission(
|
|
name="view-instance",
|
|
abbr="vi",
|
|
description="View Datasette instance",
|
|
takes_database=False,
|
|
takes_resource=False,
|
|
default=True,
|
|
),
|
|
Permission(
|
|
name="view-database",
|
|
abbr="vd",
|
|
description="View database",
|
|
takes_database=True,
|
|
takes_resource=False,
|
|
default=True,
|
|
implies_can_view=True,
|
|
),
|
|
Permission(
|
|
name="view-database-download",
|
|
abbr="vdd",
|
|
description="Download database file",
|
|
takes_database=True,
|
|
takes_resource=False,
|
|
default=True,
|
|
),
|
|
Permission(
|
|
name="view-table",
|
|
abbr="vt",
|
|
description="View table",
|
|
takes_database=True,
|
|
takes_resource=True,
|
|
default=True,
|
|
implies_can_view=True,
|
|
),
|
|
Permission(
|
|
name="view-query",
|
|
abbr="vq",
|
|
description="View named query results",
|
|
takes_database=True,
|
|
takes_resource=True,
|
|
default=True,
|
|
implies_can_view=True,
|
|
),
|
|
Permission(
|
|
name="execute-sql",
|
|
abbr="es",
|
|
description="Execute read-only SQL queries",
|
|
takes_database=True,
|
|
takes_resource=False,
|
|
default=True,
|
|
implies_can_view=True,
|
|
),
|
|
Permission(
|
|
name="permissions-debug",
|
|
abbr="pd",
|
|
description="Access permission debug tool",
|
|
takes_database=False,
|
|
takes_resource=False,
|
|
default=False,
|
|
),
|
|
Permission(
|
|
name="debug-menu",
|
|
abbr="dm",
|
|
description="View debug menu items",
|
|
takes_database=False,
|
|
takes_resource=False,
|
|
default=False,
|
|
),
|
|
Permission(
|
|
name="insert-row",
|
|
abbr="ir",
|
|
description="Insert rows",
|
|
takes_database=True,
|
|
takes_resource=True,
|
|
default=False,
|
|
),
|
|
Permission(
|
|
name="delete-row",
|
|
abbr="dr",
|
|
description="Delete rows",
|
|
takes_database=True,
|
|
takes_resource=True,
|
|
default=False,
|
|
),
|
|
Permission(
|
|
name="update-row",
|
|
abbr="ur",
|
|
description="Update rows",
|
|
takes_database=True,
|
|
takes_resource=True,
|
|
default=False,
|
|
),
|
|
Permission(
|
|
name="create-table",
|
|
abbr="ct",
|
|
description="Create tables",
|
|
takes_database=True,
|
|
takes_resource=False,
|
|
default=False,
|
|
),
|
|
Permission(
|
|
name="alter-table",
|
|
abbr="at",
|
|
description="Alter tables",
|
|
takes_database=True,
|
|
takes_resource=True,
|
|
default=False,
|
|
),
|
|
Permission(
|
|
name="drop-table",
|
|
abbr="dt",
|
|
description="Drop tables",
|
|
takes_database=True,
|
|
takes_resource=True,
|
|
default=False,
|
|
),
|
|
)
|
|
|
|
|
|
@hookimpl(tryfirst=True, specname="permission_allowed")
|
|
async def permission_allowed_sql_bridge(datasette, actor, action, resource):
|
|
"""
|
|
Bridge config-based permission rules to the old permission_allowed API.
|
|
|
|
This allows views using the old string/tuple resource API to benefit from
|
|
config blocks defined in datasette.yaml without using the new resource-based system.
|
|
|
|
Note: This does NOT apply default allow rules - those should come from the
|
|
Permission object's default value to maintain backward compatibility.
|
|
"""
|
|
# Only check config-based rules - don't apply defaults
|
|
config_rules = await _config_permission_rules(datasette, actor, action)
|
|
if not config_rules:
|
|
return None
|
|
|
|
# Evaluate config rules for this specific resource
|
|
for rule in config_rules:
|
|
if rule.params: # Has config-based rules
|
|
from datasette.utils.permissions import resolve_permissions_with_candidates
|
|
|
|
# Build candidate based on resource
|
|
if resource is None:
|
|
candidates = [(None, None)]
|
|
elif isinstance(resource, str):
|
|
candidates = [(resource, None)]
|
|
elif isinstance(resource, tuple):
|
|
candidates = [(resource[0], resource[1])]
|
|
else:
|
|
return None
|
|
|
|
db = datasette.get_internal_database()
|
|
results = await resolve_permissions_with_candidates(
|
|
db, actor, [rule], candidates, action, implicit_deny=False
|
|
)
|
|
if results:
|
|
# Use the first result's allow value
|
|
for result in results:
|
|
if result.get("allow") is not None:
|
|
return bool(result["allow"])
|
|
return None
|
|
|
|
|
|
@hookimpl(tryfirst=True, specname="permission_allowed")
|
|
def permission_allowed_root(datasette, actor, action, resource):
|
|
"""
|
|
Grant all permissions to root user when Datasette started with --root flag.
|
|
|
|
The --root flag is a localhost development tool. When used, it sets
|
|
datasette.root_enabled = True and creates an actor with id="root".
|
|
This hook grants that actor all permissions.
|
|
|
|
Other plugins can use the same pattern: check datasette.root_enabled
|
|
to decide whether to honor root users.
|
|
"""
|
|
if datasette.root_enabled and actor and actor.get("id") == "root":
|
|
return True
|
|
return None
|
|
|
|
|
|
@hookimpl
|
|
async def permission_resources_sql(datasette, actor, action):
|
|
# Root user with root_enabled gets all permissions
|
|
if datasette.root_enabled and actor and actor.get("id") == "root":
|
|
# Return SQL that grants access to ALL resources for this action
|
|
action_obj = datasette.actions.get(action)
|
|
if action_obj and action_obj.resource_class:
|
|
resources_sql = action_obj.resource_class.resources_sql()
|
|
sql = f"""
|
|
SELECT parent, child, 1 AS allow, 'root user' AS reason
|
|
FROM ({resources_sql})
|
|
"""
|
|
return PermissionSQL(
|
|
source="root_permissions",
|
|
sql=sql,
|
|
params={},
|
|
)
|
|
|
|
rules: list[PermissionSQL] = []
|
|
|
|
config_rules = await _config_permission_rules(datasette, actor, action)
|
|
rules.extend(config_rules)
|
|
|
|
# Check default_allow_sql setting for execute-sql action
|
|
if action == "execute-sql" and not datasette.setting("default_allow_sql"):
|
|
# Return a deny rule for all databases
|
|
sql = "SELECT NULL AS parent, NULL AS child, 0 AS allow, 'default_allow_sql is false' AS reason"
|
|
rules.append(
|
|
PermissionSQL(
|
|
source="default_allow_sql_setting",
|
|
sql=sql,
|
|
params={},
|
|
)
|
|
)
|
|
|
|
default_allow_actions = {
|
|
"view-instance",
|
|
"view-database",
|
|
"view-table",
|
|
"execute-sql",
|
|
}
|
|
if action in default_allow_actions:
|
|
reason = f"default allow for {action}".replace("'", "''")
|
|
sql = (
|
|
"SELECT NULL AS parent, NULL AS child, 1 AS allow, " f"'{reason}' AS reason"
|
|
)
|
|
rules.append(
|
|
PermissionSQL(
|
|
source="default_permissions",
|
|
sql=sql,
|
|
params={},
|
|
)
|
|
)
|
|
|
|
if not rules:
|
|
return None
|
|
if len(rules) == 1:
|
|
return rules[0]
|
|
return rules
|
|
|
|
|
|
async def _config_permission_rules(datasette, actor, action) -> list[PermissionSQL]:
|
|
config = datasette.config or {}
|
|
|
|
if actor is None:
|
|
actor_dict: dict | None = None
|
|
elif isinstance(actor, dict):
|
|
actor_dict = actor
|
|
else:
|
|
actor_lookup = await datasette.actors_from_ids([actor])
|
|
actor_dict = actor_lookup.get(actor) or {"id": actor}
|
|
|
|
def evaluate(allow_block):
|
|
if allow_block is None:
|
|
return None
|
|
return actor_matches_allow(actor_dict, allow_block)
|
|
|
|
rows = []
|
|
|
|
def add_row(parent, child, result, scope):
|
|
if result is None:
|
|
return
|
|
rows.append(
|
|
(
|
|
parent,
|
|
child,
|
|
bool(result),
|
|
f"config {'allow' if result else 'deny'} {scope}",
|
|
)
|
|
)
|
|
|
|
def add_row_allow_block(parent, child, allow_block, scope):
|
|
"""For 'allow' blocks, always add a row if the block exists - deny if no match"""
|
|
if allow_block is None:
|
|
return
|
|
result = evaluate(allow_block)
|
|
# If result is None (no match) or False, treat as deny
|
|
rows.append(
|
|
(
|
|
parent,
|
|
child,
|
|
bool(result), # None becomes False, False stays False, True stays True
|
|
f"config {'allow' if result else 'deny'} {scope}",
|
|
)
|
|
)
|
|
|
|
root_perm = (config.get("permissions") or {}).get(action)
|
|
add_row(None, None, evaluate(root_perm), f"permissions for {action}")
|
|
|
|
for db_name, db_config in (config.get("databases") or {}).items():
|
|
db_perm = (db_config.get("permissions") or {}).get(action)
|
|
add_row(
|
|
db_name, None, evaluate(db_perm), f"permissions for {action} on {db_name}"
|
|
)
|
|
|
|
for table_name, table_config in (db_config.get("tables") or {}).items():
|
|
table_perm = (table_config.get("permissions") or {}).get(action)
|
|
add_row(
|
|
db_name,
|
|
table_name,
|
|
evaluate(table_perm),
|
|
f"permissions for {action} on {db_name}/{table_name}",
|
|
)
|
|
|
|
if action == "view-table":
|
|
table_allow = (table_config or {}).get("allow")
|
|
add_row_allow_block(
|
|
db_name,
|
|
table_name,
|
|
table_allow,
|
|
f"allow for {action} on {db_name}/{table_name}",
|
|
)
|
|
|
|
for query_name, query_config in (db_config.get("queries") or {}).items():
|
|
# query_config can be a string (just SQL) or a dict (with SQL and options)
|
|
if isinstance(query_config, dict):
|
|
query_perm = (query_config.get("permissions") or {}).get(action)
|
|
add_row(
|
|
db_name,
|
|
query_name,
|
|
evaluate(query_perm),
|
|
f"permissions for {action} on {db_name}/{query_name}",
|
|
)
|
|
if action == "view-query":
|
|
query_allow = query_config.get("allow")
|
|
add_row_allow_block(
|
|
db_name,
|
|
query_name,
|
|
query_allow,
|
|
f"allow for {action} on {db_name}/{query_name}",
|
|
)
|
|
|
|
if action == "view-database":
|
|
db_allow = db_config.get("allow")
|
|
add_row_allow_block(
|
|
db_name, None, db_allow, f"allow for {action} on {db_name}"
|
|
)
|
|
|
|
if action == "execute-sql":
|
|
db_allow_sql = db_config.get("allow_sql")
|
|
add_row_allow_block(db_name, None, db_allow_sql, f"allow_sql for {db_name}")
|
|
|
|
if action == "view-table":
|
|
# Database-level allow block affects all tables in that database
|
|
db_allow = db_config.get("allow")
|
|
add_row_allow_block(
|
|
db_name, None, db_allow, f"allow for {action} on {db_name}"
|
|
)
|
|
|
|
# Root-level allow block applies to all view-* actions
|
|
if action == "view-instance":
|
|
allow_block = config.get("allow")
|
|
add_row_allow_block(None, None, allow_block, "allow for view-instance")
|
|
|
|
if action == "view-database":
|
|
# Root-level allow block also applies to view-database
|
|
allow_block = config.get("allow")
|
|
add_row_allow_block(None, None, allow_block, "allow for view-database")
|
|
|
|
if action == "view-table":
|
|
# Root-level allow block also applies to view-table
|
|
allow_block = config.get("allow")
|
|
add_row_allow_block(None, None, allow_block, "allow for view-table")
|
|
|
|
if action == "view-query":
|
|
# Root-level allow block also applies to view-query
|
|
allow_block = config.get("allow")
|
|
add_row_allow_block(None, None, allow_block, "allow for view-query")
|
|
|
|
if action == "execute-sql":
|
|
allow_sql = config.get("allow_sql")
|
|
add_row_allow_block(None, None, allow_sql, "allow_sql")
|
|
|
|
if not rows:
|
|
return []
|
|
|
|
parts = []
|
|
params = {}
|
|
for idx, (parent, child, allow, reason) in enumerate(rows):
|
|
key = f"cfg_{idx}"
|
|
parts.append(
|
|
f"SELECT :{key}_parent AS parent, :{key}_child AS child, :{key}_allow AS allow, :{key}_reason AS reason"
|
|
)
|
|
params[f"{key}_parent"] = parent
|
|
params[f"{key}_child"] = child
|
|
params[f"{key}_allow"] = 1 if allow else 0
|
|
params[f"{key}_reason"] = reason
|
|
|
|
sql = "\nUNION ALL\n".join(parts)
|
|
return [PermissionSQL(source="config_permissions", sql=sql, params=params)]
|
|
|
|
|
|
def restrictions_allow_action(
|
|
datasette: "Datasette",
|
|
restrictions: dict,
|
|
action: str,
|
|
resource: str | tuple[str, str],
|
|
):
|
|
"Do these restrictions allow the requested action against the requested resource?"
|
|
if action == "view-instance":
|
|
# Special case for view-instance: it's allowed if the restrictions include any
|
|
# permissions that have the implies_can_view=True flag set
|
|
all_rules = restrictions.get("a") or []
|
|
for database_rules in (restrictions.get("d") or {}).values():
|
|
all_rules += database_rules
|
|
for database_resource_rules in (restrictions.get("r") or {}).values():
|
|
for resource_rules in database_resource_rules.values():
|
|
all_rules += resource_rules
|
|
permissions = [datasette.get_permission(action) for action in all_rules]
|
|
if any(p for p in permissions if p.implies_can_view):
|
|
return True
|
|
|
|
if action == "view-database":
|
|
# Special case for view-database: it's allowed if the restrictions include any
|
|
# permissions that have the implies_can_view=True flag set AND takes_database
|
|
all_rules = restrictions.get("a") or []
|
|
database_rules = list((restrictions.get("d") or {}).get(resource) or [])
|
|
all_rules += database_rules
|
|
resource_rules = ((restrictions.get("r") or {}).get(resource) or {}).values()
|
|
for resource_rules in (restrictions.get("r") or {}).values():
|
|
for table_rules in resource_rules.values():
|
|
all_rules += table_rules
|
|
permissions = [datasette.get_permission(action) for action in all_rules]
|
|
if any(p for p in permissions if p.implies_can_view and p.takes_database):
|
|
return True
|
|
|
|
# Does this action have an abbreviation?
|
|
to_check = {action}
|
|
permission = datasette.permissions.get(action)
|
|
if permission and permission.abbr:
|
|
to_check.add(permission.abbr)
|
|
|
|
# If restrictions is defined then we use those to further restrict the actor
|
|
# Crucially, we only use this to say NO (return False) - we never
|
|
# use it to return YES (True) because that might over-ride other
|
|
# restrictions placed on this actor
|
|
all_allowed = restrictions.get("a")
|
|
if all_allowed is not None:
|
|
assert isinstance(all_allowed, list)
|
|
if to_check.intersection(all_allowed):
|
|
return True
|
|
# How about for the current database?
|
|
if resource:
|
|
if isinstance(resource, str):
|
|
database_name = resource
|
|
else:
|
|
database_name = resource[0]
|
|
database_allowed = restrictions.get("d", {}).get(database_name)
|
|
if database_allowed is not None:
|
|
assert isinstance(database_allowed, list)
|
|
if to_check.intersection(database_allowed):
|
|
return True
|
|
# Or the current table? That's any time the resource is (database, table)
|
|
if resource is not None and not isinstance(resource, str) and len(resource) == 2:
|
|
database, table = resource
|
|
table_allowed = restrictions.get("r", {}).get(database, {}).get(table)
|
|
# TODO: What should this do for canned queries?
|
|
if table_allowed is not None:
|
|
assert isinstance(table_allowed, list)
|
|
if to_check.intersection(table_allowed):
|
|
return True
|
|
|
|
# This action is not specifically allowed, so reject it
|
|
return False
|
|
|
|
|
|
@hookimpl(specname="permission_allowed")
|
|
def permission_allowed_actor_restrictions(datasette, actor, action, resource):
|
|
if actor is None:
|
|
return None
|
|
if "_r" not in actor:
|
|
# No restrictions, so we have no opinion
|
|
return None
|
|
_r = actor.get("_r")
|
|
if restrictions_allow_action(datasette, _r, action, resource):
|
|
# Return None because we do not have an opinion here
|
|
return None
|
|
else:
|
|
# Block this permission check
|
|
return False
|
|
|
|
|
|
@hookimpl
|
|
def actor_from_request(datasette, request):
|
|
prefix = "dstok_"
|
|
if not datasette.setting("allow_signed_tokens"):
|
|
return None
|
|
max_signed_tokens_ttl = datasette.setting("max_signed_tokens_ttl")
|
|
authorization = request.headers.get("authorization")
|
|
if not authorization:
|
|
return None
|
|
if not authorization.startswith("Bearer "):
|
|
return None
|
|
token = authorization[len("Bearer ") :]
|
|
if not token.startswith(prefix):
|
|
return None
|
|
token = token[len(prefix) :]
|
|
try:
|
|
decoded = datasette.unsign(token, namespace="token")
|
|
except itsdangerous.BadSignature:
|
|
return None
|
|
if "t" not in decoded:
|
|
# Missing timestamp
|
|
return None
|
|
created = decoded["t"]
|
|
if not isinstance(created, int):
|
|
# Invalid timestamp
|
|
return None
|
|
duration = decoded.get("d")
|
|
if duration is not None and not isinstance(duration, int):
|
|
# Invalid duration
|
|
return None
|
|
if (duration is None and max_signed_tokens_ttl) or (
|
|
duration is not None
|
|
and max_signed_tokens_ttl
|
|
and duration > max_signed_tokens_ttl
|
|
):
|
|
duration = max_signed_tokens_ttl
|
|
if duration:
|
|
if time.time() - created > duration:
|
|
# Expired
|
|
return None
|
|
actor = {"id": decoded["a"], "token": "dstok"}
|
|
if "_r" in decoded:
|
|
actor["_r"] = decoded["_r"]
|
|
if duration:
|
|
actor["token_expires"] = created + duration
|
|
return actor
|
|
|
|
|
|
@hookimpl
|
|
def skip_csrf(scope):
|
|
# Skip CSRF check for requests with content-type: application/json
|
|
if scope["type"] == "http":
|
|
headers = scope.get("headers") or {}
|
|
if dict(headers).get(b"content-type") == b"application/json":
|
|
return True
|