diff --git a/datasette/default_permissions.py b/datasette/default_permissions.py index 1fc85ebf..41e1ea7f 100644 --- a/datasette/default_permissions.py +++ b/datasette/default_permissions.py @@ -12,60 +12,61 @@ import itsdangerous import time -@hookimpl -async def permission_resources_sql(datasette, actor, action): - rules: list[PermissionSQL] = [] +@hookimpl(specname="permission_resources_sql") +async def actor_restrictions_sql(datasette, actor, action): + """Handle actor restriction-based permission rules (_r key).""" + if not actor: + return None + return await _restriction_permission_rules(datasette, actor, action) - # 1. FIRST: Actor restrictions (if present) - # These act as a gating filter - must pass through before other checks - restriction_rules = await _restriction_permission_rules(datasette, actor, action) - rules.extend(restriction_rules) - # 2. Root user permissions - # Root user with root_enabled gets all permissions at global level - # Config rules at more specific levels (database/table) can still override +@hookimpl(specname="permission_resources_sql") +async def root_user_permissions_sql(datasette, actor, action): + """Grant root user full permissions when enabled.""" if datasette.root_enabled and actor and actor.get("id") == "root": # Add a single global-level allow rule (NULL, NULL) for root # This allows root to access everything by default, but database-level # and table-level deny rules in config can still block specific resources - rules.append(PermissionSQL.allow(reason="root user")) + return PermissionSQL.allow(reason="root user") + return None - # 3. Config-based permission rules - config_rules = await _config_permission_rules(datasette, actor, action) - rules.extend(config_rules) - # 4. Check default_allow_sql setting for execute-sql action +@hookimpl(specname="permission_resources_sql") +async def config_permissions_sql(datasette, actor, action): + """Apply config-based permission rules from datasette.yaml.""" + return await _config_permission_rules(datasette, actor, action) + + +@hookimpl(specname="permission_resources_sql") +async def default_allow_sql_check(datasette, actor, action): + """Enforce default_allow_sql setting for execute-sql action.""" if action == "execute-sql" and not datasette.setting("default_allow_sql"): - # Return a deny rule for all databases - rules.append(PermissionSQL.deny(reason="default_allow_sql is false")) - # Early return - don't add default allow rule - if not rules: - return None - if len(rules) == 1: - return rules[0] - return rules + return PermissionSQL.deny(reason="default_allow_sql is false") + return None - # 5. Default allow actions (ONLY if no restrictions) + +@hookimpl(specname="permission_resources_sql") +async def default_action_permissions_sql(datasette, actor, action): + """Apply default allow rules for standard view/execute actions.""" + # Only apply defaults if actor has no restrictions # If actor has restrictions, they've already added their own deny/allow rules has_restrictions = actor and "_r" in actor - if not has_restrictions: - default_allow_actions = { - "view-instance", - "view-database", - "view-database-download", - "view-table", - "view-query", - "execute-sql", - } - if action in default_allow_actions: - reason = f"default allow for {action}".replace("'", "''") - rules.append(PermissionSQL.allow(reason=reason)) - - if not rules: + if has_restrictions: return None - if len(rules) == 1: - return rules[0] - return rules + + default_allow_actions = { + "view-instance", + "view-database", + "view-database-download", + "view-table", + "view-query", + "execute-sql", + } + if action in default_allow_actions: + reason = f"default allow for {action}".replace("'", "''") + return PermissionSQL.allow(reason=reason) + + return None async def _config_permission_rules(datasette, actor, action) -> list[PermissionSQL]: diff --git a/tests/test_actions_sql.py b/tests/test_actions_sql.py index 19d44528..734a427d 100644 --- a/tests/test_actions_sql.py +++ b/tests/test_actions_sql.py @@ -315,62 +315,3 @@ async def test_sql_does_filtering_not_python(test_ds): finally: pm.unregister(plugin, name="test_plugin") - - -@pytest.mark.asyncio -async def test_no_permission_rules_returns_correct_schema(): - """ - Test that when no permission rules exist, the empty result has correct schema. - - This is a regression test for a bug where the empty result returned only - 2 columns (parent, child) instead of the documented 3 columns - (parent, child, reason), causing schema mismatches. - - See: https://github.com/simonw/datasette/pull/2515#discussion_r2457803901 - """ - from datasette.utils.actions_sql import build_allowed_resources_sql - - # Create a fresh datasette instance - ds = Datasette() - await ds.invoke_startup() - - # Add a test database - db = ds.add_memory_database("testdb") - await db.execute_write( - "CREATE TABLE IF NOT EXISTS test_table (id INTEGER PRIMARY KEY)" - ) - await ds._refresh_schemas() - - # Temporarily unregister all permission_resources_sql providers to simulate no rules - hook_caller = pm.hook.permission_resources_sql - hookimpls = hook_caller.get_hookimpls() - removed_plugins = [ - (impl.plugin_name, impl.plugin) for impl in hookimpls if impl.plugin is not None - ] - - for plugin_name, _ in removed_plugins: - pm.unregister(name=plugin_name) - - try: - # Call build_allowed_resources_sql directly which will hit the no-rules code path - sql, params = await build_allowed_resources_sql( - ds, actor={"id": "nobody"}, action="view-table" - ) - - # Execute the query to verify it has correct column structure - result = await ds.get_internal_database().execute(sql, params) - - # Should have 3 columns: parent, child, reason - # This assertion would fail if the empty result only had 2 columns - assert ( - len(result.columns) == 3 - ), f"Expected 3 columns, got {len(result.columns)}: {result.columns}" - assert result.columns == ["parent", "child", "reason"] - - # Should have no rows (no rules = no access) - assert len(result.rows) == 0 - - finally: - # Restore original plugins in the order they were removed - for plugin_name, plugin in removed_plugins: - pm.register(plugin, name=plugin_name)