diff --git a/datasette/default_permissions.py b/datasette/default_permissions.py index ab2f6312..3c295470 100644 --- a/datasette/default_permissions.py +++ b/datasette/default_permissions.py @@ -9,6 +9,7 @@ import time @hookimpl(tryfirst=True, specname="permission_allowed") def permission_allowed_default(datasette, actor, action, resource): async def inner(): + # id=root gets some special permissions: if action in ( "permissions-debug", "debug-menu", @@ -20,45 +21,72 @@ def permission_allowed_default(datasette, actor, action, resource): ): if actor and actor.get("id") == "root": return True - elif action == "view-instance": - allow = datasette.metadata("allow") - if allow is not None: - return actor_matches_allow(actor, allow) - elif action == "view-database": - if resource == "_internal" and (actor is None or actor.get("id") != "root"): - return False - database_allow = datasette.metadata("allow", database=resource) - if database_allow is None: - return None - return actor_matches_allow(actor, database_allow) - elif action == "view-table": - database, table = resource - tables = datasette.metadata("tables", database=database) or {} - table_allow = (tables.get(table) or {}).get("allow") - if table_allow is None: - return None - return actor_matches_allow(actor, table_allow) - elif action == "view-query": - # Check if this query has a "allow" block in metadata - database, query_name = resource - query = await datasette.get_canned_query(database, query_name, actor) - assert query is not None - allow = query.get("allow") - if allow is None: - return None - return actor_matches_allow(actor, allow) - elif action == "execute-sql": - # Use allow_sql block from database block, or from top-level - database_allow_sql = datasette.metadata("allow_sql", database=resource) - if database_allow_sql is None: - database_allow_sql = datasette.metadata("allow_sql") - if database_allow_sql is None: - return None - return actor_matches_allow(actor, database_allow_sql) + + # Resolve metadata view permissions + if action in ( + "view-instance", + "view-database", + "view-table", + "view-query", + "execute-sql", + ): + result = await _resolve_metadata_view_permissions( + datasette, actor, action, resource + ) + if result is not None: + return result + + # Check custom permissions: blocks + return await _resolve_metadata_permissions_blocks( + datasette, actor, action, resource + ) return inner +async def _resolve_metadata_permissions_blocks(datasette, actor, action, resource): + # Check custom permissions: blocks - not yet implemented + return None + + +async def _resolve_metadata_view_permissions(datasette, actor, action, resource): + if action == "view-instance": + allow = datasette.metadata("allow") + if allow is not None: + return actor_matches_allow(actor, allow) + elif action == "view-database": + if resource == "_internal" and (actor is None or actor.get("id") != "root"): + return False + database_allow = datasette.metadata("allow", database=resource) + if database_allow is None: + return None + return actor_matches_allow(actor, database_allow) + elif action == "view-table": + database, table = resource + tables = datasette.metadata("tables", database=database) or {} + table_allow = (tables.get(table) or {}).get("allow") + if table_allow is None: + return None + return actor_matches_allow(actor, table_allow) + elif action == "view-query": + # Check if this query has a "allow" block in metadata + database, query_name = resource + query = await datasette.get_canned_query(database, query_name, actor) + assert query is not None + allow = query.get("allow") + if allow is None: + return None + return actor_matches_allow(actor, allow) + elif action == "execute-sql": + # Use allow_sql block from database block, or from top-level + database_allow_sql = datasette.metadata("allow_sql", database=resource) + if database_allow_sql is None: + database_allow_sql = datasette.metadata("allow_sql") + if database_allow_sql is None: + return None + return actor_matches_allow(actor, database_allow_sql) + + @hookimpl(specname="permission_allowed") def permission_allowed_actor_restrictions(actor, action, resource): if actor is None: diff --git a/tests/test_permissions.py b/tests/test_permissions.py index 4eb18cee..f414216c 100644 --- a/tests/test_permissions.py +++ b/tests/test_permissions.py @@ -1,3 +1,4 @@ +import collections from datasette.app import Datasette from .fixtures import app_client, assert_permissions_checked, make_app_client from bs4 import BeautifulSoup as Soup @@ -640,3 +641,48 @@ async def test_actor_restricted_permissions( "result": expected_result, } assert response.json() == expected + + +PermMetadataTestCase = collections.namedtuple( + "PermMetadataTestCase", + "metadata,actor,action,resource,default,expected_result", +) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "metadata,actor,action,resource,default,expected_result", + ( + # Simple view-instance default=True example + PermMetadataTestCase( + metadata={}, + actor=None, + action="view-instance", + resource=None, + default=True, + expected_result=True, + ), + # debug-menu on root + PermMetadataTestCase( + metadata={"permissions": {"debug-menu": {"id": "user"}}}, + actor={"id": "user"}, + action="debug-menu", + resource=None, + default=False, + expected_result=True, + ), + ), +) +async def test_permissions_in_metadata( + perms_ds, metadata, actor, action, resource, default, expected_result +): + previous_metadata = perms_ds.metadata() + updated_metadata = copy.deepcopy(previous_metadata) + updated_metadata.update(metadata) + try: + result = await perms_ds.permission_allowed( + actor, action, resource, default=default + ) + assert result == expected_result + finally: + perms_ds._metadata_local = previous_metadata