datasette/tests/test_utils_permissions.py
Simon Willison e8b79970fb Implement also_requires to enforce view-database for execute-sql
Adds Action.also_requires field to specify dependencies between permissions.
When an action has also_requires set, users must have permission for BOTH
the main action AND the required action on a resource.

Applies this to execute-sql, which now requires view-database permission.
This prevents the illogical scenario where users can execute SQL on a
database they cannot view.

Changes:
- Add also_requires field to Action dataclass in datasette/permissions.py
- Update execute-sql action with also_requires="view-database"
- Implement also_requires handling in build_allowed_resources_sql()
- Implement also_requires handling in AllowedResourcesView endpoint
- Add test verifying execute-sql requires view-database permission

Fixes #2527
2025-10-24 12:14:52 -07:00

631 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pytest
from datasette.app import Datasette
from datasette.permissions import PermissionSQL
from datasette.utils.permissions import resolve_permissions_from_catalog
from typing import Callable, List
@pytest.fixture
def db():
ds = Datasette()
import tempfile
from datasette.database import Database
path = tempfile.mktemp(suffix="demo.db")
db = ds.add_database(Database(ds, path=path))
print(path)
return db
NO_RULES_SQL = (
"SELECT NULL AS parent, NULL AS child, NULL AS allow, NULL AS reason WHERE 0"
)
def plugin_allow_all_for_user(user: str) -> Callable[[str], PermissionSQL]:
def provider(action: str) -> PermissionSQL:
return PermissionSQL(
"allow_all",
"""
SELECT NULL AS parent, NULL AS child, 1 AS allow,
'global allow for ' || :allow_all_user || ' on ' || :allow_all_action AS reason
WHERE :actor_id = :allow_all_user
""",
{"allow_all_user": user, "allow_all_action": action},
)
return provider
def plugin_deny_specific_table(
user: str, parent: str, child: str
) -> Callable[[str], PermissionSQL]:
def provider(action: str) -> PermissionSQL:
return PermissionSQL(
"deny_specific_table",
"""
SELECT :deny_specific_table_parent AS parent, :deny_specific_table_child AS child, 0 AS allow,
'deny ' || :deny_specific_table_parent || '/' || :deny_specific_table_child || ' for ' || :deny_specific_table_user || ' on ' || :deny_specific_table_action AS reason
WHERE :actor_id = :deny_specific_table_user
""",
{
"deny_specific_table_parent": parent,
"deny_specific_table_child": child,
"deny_specific_table_user": user,
"deny_specific_table_action": action,
},
)
return provider
def plugin_org_policy_deny_parent(parent: str) -> Callable[[str], PermissionSQL]:
def provider(action: str) -> PermissionSQL:
return PermissionSQL(
"org_policy_parent_deny",
"""
SELECT :org_policy_parent_deny_parent AS parent, NULL AS child, 0 AS allow,
'org policy: parent ' || :org_policy_parent_deny_parent || ' denied on ' || :org_policy_parent_deny_action AS reason
""",
{
"org_policy_parent_deny_parent": parent,
"org_policy_parent_deny_action": action,
},
)
return provider
def plugin_allow_parent_for_user(
user: str, parent: str
) -> Callable[[str], PermissionSQL]:
def provider(action: str) -> PermissionSQL:
return PermissionSQL(
"allow_parent",
"""
SELECT :allow_parent_parent AS parent, NULL AS child, 1 AS allow,
'allow full parent for ' || :allow_parent_user || ' on ' || :allow_parent_action AS reason
WHERE :actor_id = :allow_parent_user
""",
{
"allow_parent_parent": parent,
"allow_parent_user": user,
"allow_parent_action": action,
},
)
return provider
def plugin_child_allow_for_user(
user: str, parent: str, child: str
) -> Callable[[str], PermissionSQL]:
def provider(action: str) -> PermissionSQL:
return PermissionSQL(
"allow_child",
"""
SELECT :allow_child_parent AS parent, :allow_child_child AS child, 1 AS allow,
'allow child for ' || :allow_child_user || ' on ' || :allow_child_action AS reason
WHERE :actor_id = :allow_child_user
""",
{
"allow_child_parent": parent,
"allow_child_child": child,
"allow_child_user": user,
"allow_child_action": action,
},
)
return provider
def plugin_root_deny_for_all() -> Callable[[str], PermissionSQL]:
def provider(action: str) -> PermissionSQL:
return PermissionSQL(
"root_deny",
"""
SELECT NULL AS parent, NULL AS child, 0 AS allow, 'root deny for all on ' || :root_deny_action AS reason
""",
{"root_deny_action": action},
)
return provider
def plugin_conflicting_same_child_rules(
user: str, parent: str, child: str
) -> List[Callable[[str], PermissionSQL]]:
def allow_provider(action: str) -> PermissionSQL:
return PermissionSQL(
"conflict_child_allow",
"""
SELECT :conflict_child_allow_parent AS parent, :conflict_child_allow_child AS child, 1 AS allow,
'team grant at child for ' || :conflict_child_allow_user || ' on ' || :conflict_child_allow_action AS reason
WHERE :actor_id = :conflict_child_allow_user
""",
{
"conflict_child_allow_parent": parent,
"conflict_child_allow_child": child,
"conflict_child_allow_user": user,
"conflict_child_allow_action": action,
},
)
def deny_provider(action: str) -> PermissionSQL:
return PermissionSQL(
"conflict_child_deny",
"""
SELECT :conflict_child_deny_parent AS parent, :conflict_child_deny_child AS child, 0 AS allow,
'exception deny at child for ' || :conflict_child_deny_user || ' on ' || :conflict_child_deny_action AS reason
WHERE :actor_id = :conflict_child_deny_user
""",
{
"conflict_child_deny_parent": parent,
"conflict_child_deny_child": child,
"conflict_child_deny_user": user,
"conflict_child_deny_action": action,
},
)
return [allow_provider, deny_provider]
def plugin_allow_all_for_action(
user: str, allowed_action: str
) -> Callable[[str], PermissionSQL]:
def provider(action: str) -> PermissionSQL:
if action != allowed_action:
return PermissionSQL(
f"allow_all_{allowed_action}_noop",
NO_RULES_SQL,
{},
)
source_name = f"allow_all_{allowed_action}"
# Sanitize parameter names by replacing hyphens with underscores
param_prefix = source_name.replace("-", "_")
return PermissionSQL(
source_name,
f"""
SELECT NULL AS parent, NULL AS child, 1 AS allow,
'global allow for ' || :{param_prefix}_user || ' on ' || :{param_prefix}_action AS reason
WHERE :actor_id = :{param_prefix}_user
""",
{f"{param_prefix}_user": user, f"{param_prefix}_action": action},
)
return provider
VIEW_TABLE = "view-table"
# ---------- Catalog DDL (from your schema) ----------
CATALOG_DDL = """
CREATE TABLE IF NOT EXISTS catalog_databases (
database_name TEXT PRIMARY KEY,
path TEXT,
is_memory INTEGER,
schema_version INTEGER
);
CREATE TABLE IF NOT EXISTS catalog_tables (
database_name TEXT,
table_name TEXT,
rootpage INTEGER,
sql TEXT,
PRIMARY KEY (database_name, table_name),
FOREIGN KEY (database_name) REFERENCES catalog_databases(database_name)
);
"""
PARENTS = ["accounting", "hr", "analytics"]
SPECIALS = {"accounting": ["sales"], "analytics": ["secret"], "hr": []}
TABLE_CANDIDATES_SQL = (
"SELECT database_name AS parent, table_name AS child FROM catalog_tables"
)
PARENT_CANDIDATES_SQL = (
"SELECT database_name AS parent, NULL AS child FROM catalog_databases"
)
# ---------- Helpers ----------
async def seed_catalog(db, per_parent: int = 10) -> None:
await db.execute_write_script(CATALOG_DDL)
# databases
db_rows = [(p, f"/{p}.db", 0, 1) for p in PARENTS]
await db.execute_write_many(
"INSERT OR REPLACE INTO catalog_databases(database_name, path, is_memory, schema_version) VALUES (?,?,?,?)",
db_rows,
)
# tables
def tables_for(parent: str, n: int):
base = [f"table{i:02d}" for i in range(1, n + 1)]
for s in SPECIALS.get(parent, []):
if s not in base:
base[0] = s
return base
table_rows = []
for p in PARENTS:
for t in tables_for(p, per_parent):
table_rows.append((p, t, 0, f"CREATE TABLE {t} (id INTEGER PRIMARY KEY)"))
await db.execute_write_many(
"INSERT OR REPLACE INTO catalog_tables(database_name, table_name, rootpage, sql) VALUES (?,?,?,?)",
table_rows,
)
def res_allowed(rows, parent=None):
return sorted(
r["resource"]
for r in rows
if r["allow"] == 1 and (parent is None or r["parent"] == parent)
)
def res_denied(rows, parent=None):
return sorted(
r["resource"]
for r in rows
if r["allow"] == 0 and (parent is None or r["parent"] == parent)
)
# ---------- Tests ----------
@pytest.mark.asyncio
async def test_alice_global_allow_with_specific_denies_catalog(db):
await seed_catalog(db)
plugins = [
plugin_allow_all_for_user("alice"),
plugin_deny_specific_table("alice", "accounting", "sales"),
plugin_org_policy_deny_parent("hr"),
]
rows = await resolve_permissions_from_catalog(
db,
{"id": "alice"},
plugins,
VIEW_TABLE,
TABLE_CANDIDATES_SQL,
implicit_deny=True,
)
# Alice can see everything except accounting/sales and hr/*
assert "/accounting/sales" in res_denied(rows)
for r in rows:
if r["parent"] == "hr":
assert r["allow"] == 0
elif r["resource"] == "/accounting/sales":
assert r["allow"] == 0
else:
assert r["allow"] == 1
@pytest.mark.asyncio
async def test_carol_parent_allow_but_child_conflict_deny_wins_catalog(db):
await seed_catalog(db)
plugins = [
plugin_org_policy_deny_parent("hr"),
plugin_allow_parent_for_user("carol", "analytics"),
*plugin_conflicting_same_child_rules("carol", "analytics", "secret"),
]
rows = await resolve_permissions_from_catalog(
db,
{"id": "carol"},
plugins,
VIEW_TABLE,
TABLE_CANDIDATES_SQL,
implicit_deny=True,
)
allowed_analytics = res_allowed(rows, parent="analytics")
denied_analytics = res_denied(rows, parent="analytics")
assert "/analytics/secret" in denied_analytics
# 10 analytics children total, 1 denied
assert len(allowed_analytics) == 9
@pytest.mark.asyncio
async def test_specificity_child_allow_overrides_parent_deny_catalog(db):
await seed_catalog(db)
plugins = [
plugin_allow_all_for_user("alice"),
plugin_org_policy_deny_parent("analytics"), # parent-level deny
plugin_child_allow_for_user(
"alice", "analytics", "table02"
), # child allow beats parent deny
]
rows = await resolve_permissions_from_catalog(
db,
{"id": "alice"},
plugins,
VIEW_TABLE,
TABLE_CANDIDATES_SQL,
implicit_deny=True,
)
# table02 allowed, other analytics tables denied
assert any(r["resource"] == "/analytics/table02" and r["allow"] == 1 for r in rows)
assert all(
(r["parent"] != "analytics" or r["child"] == "table02" or r["allow"] == 0)
for r in rows
)
@pytest.mark.asyncio
async def test_root_deny_all_but_parent_allow_rescues_specific_parent_catalog(db):
await seed_catalog(db)
plugins = [
plugin_root_deny_for_all(), # root deny
plugin_allow_parent_for_user(
"bob", "accounting"
), # parent allow (more specific)
]
rows = await resolve_permissions_from_catalog(
db, {"id": "bob"}, plugins, VIEW_TABLE, TABLE_CANDIDATES_SQL, implicit_deny=True
)
for r in rows:
if r["parent"] == "accounting":
assert r["allow"] == 1
else:
assert r["allow"] == 0
@pytest.mark.asyncio
async def test_parent_scoped_candidates(db):
await seed_catalog(db)
plugins = [
plugin_org_policy_deny_parent("hr"),
plugin_allow_parent_for_user("carol", "analytics"),
]
rows = await resolve_permissions_from_catalog(
db,
{"id": "carol"},
plugins,
VIEW_TABLE,
PARENT_CANDIDATES_SQL,
implicit_deny=True,
)
d = {r["resource"]: r["allow"] for r in rows}
assert d["/analytics"] == 1
assert d["/hr"] == 0
@pytest.mark.asyncio
async def test_implicit_deny_behavior(db):
await seed_catalog(db)
plugins = [] # no rules at all
# implicit_deny=True -> everything denied with reason 'implicit deny'
rows = await resolve_permissions_from_catalog(
db,
{"id": "erin"},
plugins,
VIEW_TABLE,
TABLE_CANDIDATES_SQL,
implicit_deny=True,
)
assert all(r["allow"] == 0 and r["reason"] == "implicit deny" for r in rows)
# implicit_deny=False -> no winner => allow is None, reason is None
rows2 = await resolve_permissions_from_catalog(
db,
{"id": "erin"},
plugins,
VIEW_TABLE,
TABLE_CANDIDATES_SQL,
implicit_deny=False,
)
assert all(r["allow"] is None and r["reason"] is None for r in rows2)
@pytest.mark.asyncio
async def test_candidate_filters_via_params(db):
await seed_catalog(db)
# Add some metadata to test filtering
# Mark 'hr' as is_memory=1 and increment analytics schema_version
await db.execute_write(
"UPDATE catalog_databases SET is_memory=1 WHERE database_name='hr'"
)
await db.execute_write(
"UPDATE catalog_databases SET schema_version=2 WHERE database_name='analytics'"
)
# Candidate SQL that filters by db metadata via params
candidate_sql = """
SELECT t.database_name AS parent, t.table_name AS child
FROM catalog_tables t
JOIN catalog_databases d ON d.database_name = t.database_name
WHERE (:exclude_memory = 1 AND d.is_memory = 1) IS NOT 1
AND (:min_schema_version IS NULL OR d.schema_version >= :min_schema_version)
"""
plugins = [
plugin_root_deny_for_all(),
plugin_allow_parent_for_user(
"dev", "analytics"
), # analytics rescued if included by candidates
]
# Case 1: exclude memory dbs, require schema_version >= 2 -> only analytics appear, and thus are allowed
rows = await resolve_permissions_from_catalog(
db,
{"id": "dev"},
plugins,
VIEW_TABLE,
candidate_sql,
candidate_params={"exclude_memory": 1, "min_schema_version": 2},
implicit_deny=True,
)
assert rows and all(r["parent"] == "analytics" for r in rows)
assert all(r["allow"] == 1 for r in rows)
# Case 2: include memory dbs, min_schema_version = None -> accounting/hr/analytics appear,
# but root deny wins except where specifically allowed (none except analytics parent allow doesnt apply to table depth if candidate includes children; still fine—policy is explicit).
rows2 = await resolve_permissions_from_catalog(
db,
{"id": "dev"},
plugins,
VIEW_TABLE,
candidate_sql,
candidate_params={"exclude_memory": 0, "min_schema_version": None},
implicit_deny=True,
)
assert any(r["parent"] == "accounting" for r in rows2)
assert any(r["parent"] == "hr" for r in rows2)
# For table-scoped candidates, the parent-level allow does not override root deny unless you have child-level rules
assert all(r["allow"] in (0, 1) for r in rows2)
@pytest.mark.asyncio
async def test_action_specific_rules(db):
await seed_catalog(db)
plugins = [plugin_allow_all_for_action("dana", VIEW_TABLE)]
view_rows = await resolve_permissions_from_catalog(
db,
{"id": "dana"},
plugins,
VIEW_TABLE,
TABLE_CANDIDATES_SQL,
implicit_deny=True,
)
assert view_rows and all(r["allow"] == 1 for r in view_rows)
assert all(r["action"] == VIEW_TABLE for r in view_rows)
insert_rows = await resolve_permissions_from_catalog(
db,
{"id": "dana"},
plugins,
"insert-row",
TABLE_CANDIDATES_SQL,
implicit_deny=True,
)
assert insert_rows and all(r["allow"] == 0 for r in insert_rows)
assert all(r["reason"] == "implicit deny" for r in insert_rows)
assert all(r["action"] == "insert-row" for r in insert_rows)
@pytest.mark.asyncio
async def test_actor_actor_id_action_parameters_available(db):
"""Test that :actor (JSON), :actor_id, and :action are all available in SQL"""
await seed_catalog(db)
def plugin_using_all_parameters() -> Callable[[str], PermissionSQL]:
def provider(action: str) -> PermissionSQL:
return PermissionSQL(
"test_all_params",
"""
SELECT NULL AS parent, NULL AS child, 1 AS allow,
'Actor ID: ' || COALESCE(:actor_id, 'null') ||
', Actor JSON: ' || COALESCE(:actor, 'null') ||
', Action: ' || :action AS reason
WHERE :actor_id = 'test_user' AND :action = 'view-table'
AND json_extract(:actor, '$.role') = 'admin'
""",
{},
)
return provider
plugins = [plugin_using_all_parameters()]
# Test with full actor dict
rows = await resolve_permissions_from_catalog(
db,
{"id": "test_user", "role": "admin"},
plugins,
"view-table",
TABLE_CANDIDATES_SQL,
implicit_deny=True,
)
# Should have allowed rows with reason containing all the info
allowed = [r for r in rows if r["allow"] == 1]
assert len(allowed) > 0
# Check that the reason string contains evidence of all parameters
reason = allowed[0]["reason"]
assert "test_user" in reason
assert "view-table" in reason
# The :actor parameter should be the JSON string
assert "Actor JSON:" in reason
@pytest.mark.asyncio
async def test_multiple_plugins_with_own_parameters(db):
"""
Test that multiple plugins can use their own parameter names without conflict.
This verifies that the parameter naming convention works: plugins prefix their
parameters (e.g., :plugin1_pattern, :plugin2_message) and both sets of parameters
are successfully bound in the SQL queries.
"""
await seed_catalog(db)
def plugin_one() -> Callable[[str], PermissionSQL]:
def provider(action: str) -> PermissionSQL:
if action != "view-table":
return PermissionSQL("plugin_one", "SELECT NULL WHERE 0", {})
return PermissionSQL(
"plugin_one",
"""
SELECT database_name AS parent, table_name AS child,
1 AS allow, 'Plugin one used param: ' || :plugin1_param AS reason
FROM catalog_tables
WHERE database_name = 'accounting'
""",
{
"plugin1_param": "value1",
},
)
return provider
def plugin_two() -> Callable[[str], PermissionSQL]:
def provider(action: str) -> PermissionSQL:
if action != "view-table":
return PermissionSQL("plugin_two", "SELECT NULL WHERE 0", {})
return PermissionSQL(
"plugin_two",
"""
SELECT database_name AS parent, table_name AS child,
1 AS allow, 'Plugin two used param: ' || :plugin2_param AS reason
FROM catalog_tables
WHERE database_name = 'hr'
""",
{
"plugin2_param": "value2",
},
)
return provider
plugins = [plugin_one(), plugin_two()]
rows = await resolve_permissions_from_catalog(
db,
{"id": "test_user"},
plugins,
"view-table",
TABLE_CANDIDATES_SQL,
implicit_deny=False,
)
# Both plugins should contribute results with their parameters successfully bound
plugin_one_rows = [
r for r in rows if r.get("reason") and "Plugin one" in r["reason"]
]
plugin_two_rows = [
r for r in rows if r.get("reason") and "Plugin two" in r["reason"]
]
assert len(plugin_one_rows) > 0, "Plugin one should contribute rules"
assert len(plugin_two_rows) > 0, "Plugin two should contribute rules"
# Verify each plugin's parameters were successfully bound in the SQL
assert any(
"value1" in r.get("reason", "") for r in plugin_one_rows
), "Plugin one's :plugin1_param should be bound"
assert any(
"value2" in r.get("reason", "") for r in plugin_two_rows
), "Plugin two's :plugin2_param should be bound"