Refactor default_permissions.py to help with implementation of #1636

This commit is contained in:
Simon Willison 2022-12-08 14:44:27 -08:00
commit 94be9953c5
2 changed files with 109 additions and 35 deletions

View file

@ -9,6 +9,7 @@ import time
@hookimpl(tryfirst=True, specname="permission_allowed") @hookimpl(tryfirst=True, specname="permission_allowed")
def permission_allowed_default(datasette, actor, action, resource): def permission_allowed_default(datasette, actor, action, resource):
async def inner(): async def inner():
# id=root gets some special permissions:
if action in ( if action in (
"permissions-debug", "permissions-debug",
"debug-menu", "debug-menu",
@ -20,45 +21,72 @@ def permission_allowed_default(datasette, actor, action, resource):
): ):
if actor and actor.get("id") == "root": if actor and actor.get("id") == "root":
return True return True
elif action == "view-instance":
allow = datasette.metadata("allow") # Resolve metadata view permissions
if allow is not None: if action in (
return actor_matches_allow(actor, allow) "view-instance",
elif action == "view-database": "view-database",
if resource == "_internal" and (actor is None or actor.get("id") != "root"): "view-table",
return False "view-query",
database_allow = datasette.metadata("allow", database=resource) "execute-sql",
if database_allow is None: ):
return None result = await _resolve_metadata_view_permissions(
return actor_matches_allow(actor, database_allow) datasette, actor, action, resource
elif action == "view-table": )
database, table = resource if result is not None:
tables = datasette.metadata("tables", database=database) or {} return result
table_allow = (tables.get(table) or {}).get("allow")
if table_allow is None: # Check custom permissions: blocks
return None return await _resolve_metadata_permissions_blocks(
return actor_matches_allow(actor, table_allow) datasette, actor, action, resource
elif action == "view-query": )
# Check if this query has a "allow" block in metadata
database, query_name = resource
query = await datasette.get_canned_query(database, query_name, actor)
assert query is not None
allow = query.get("allow")
if allow is None:
return None
return actor_matches_allow(actor, allow)
elif action == "execute-sql":
# Use allow_sql block from database block, or from top-level
database_allow_sql = datasette.metadata("allow_sql", database=resource)
if database_allow_sql is None:
database_allow_sql = datasette.metadata("allow_sql")
if database_allow_sql is None:
return None
return actor_matches_allow(actor, database_allow_sql)
return inner return inner
async def _resolve_metadata_permissions_blocks(datasette, actor, action, resource):
# Check custom permissions: blocks - not yet implemented
return None
async def _resolve_metadata_view_permissions(datasette, actor, action, resource):
if action == "view-instance":
allow = datasette.metadata("allow")
if allow is not None:
return actor_matches_allow(actor, allow)
elif action == "view-database":
if resource == "_internal" and (actor is None or actor.get("id") != "root"):
return False
database_allow = datasette.metadata("allow", database=resource)
if database_allow is None:
return None
return actor_matches_allow(actor, database_allow)
elif action == "view-table":
database, table = resource
tables = datasette.metadata("tables", database=database) or {}
table_allow = (tables.get(table) or {}).get("allow")
if table_allow is None:
return None
return actor_matches_allow(actor, table_allow)
elif action == "view-query":
# Check if this query has a "allow" block in metadata
database, query_name = resource
query = await datasette.get_canned_query(database, query_name, actor)
assert query is not None
allow = query.get("allow")
if allow is None:
return None
return actor_matches_allow(actor, allow)
elif action == "execute-sql":
# Use allow_sql block from database block, or from top-level
database_allow_sql = datasette.metadata("allow_sql", database=resource)
if database_allow_sql is None:
database_allow_sql = datasette.metadata("allow_sql")
if database_allow_sql is None:
return None
return actor_matches_allow(actor, database_allow_sql)
@hookimpl(specname="permission_allowed") @hookimpl(specname="permission_allowed")
def permission_allowed_actor_restrictions(actor, action, resource): def permission_allowed_actor_restrictions(actor, action, resource):
if actor is None: if actor is None:

View file

@ -1,3 +1,4 @@
import collections
from datasette.app import Datasette from datasette.app import Datasette
from .fixtures import app_client, assert_permissions_checked, make_app_client from .fixtures import app_client, assert_permissions_checked, make_app_client
from bs4 import BeautifulSoup as Soup from bs4 import BeautifulSoup as Soup
@ -640,3 +641,48 @@ async def test_actor_restricted_permissions(
"result": expected_result, "result": expected_result,
} }
assert response.json() == expected assert response.json() == expected
PermMetadataTestCase = collections.namedtuple(
"PermMetadataTestCase",
"metadata,actor,action,resource,default,expected_result",
)
@pytest.mark.asyncio
@pytest.mark.parametrize(
"metadata,actor,action,resource,default,expected_result",
(
# Simple view-instance default=True example
PermMetadataTestCase(
metadata={},
actor=None,
action="view-instance",
resource=None,
default=True,
expected_result=True,
),
# debug-menu on root
PermMetadataTestCase(
metadata={"permissions": {"debug-menu": {"id": "user"}}},
actor={"id": "user"},
action="debug-menu",
resource=None,
default=False,
expected_result=True,
),
),
)
async def test_permissions_in_metadata(
perms_ds, metadata, actor, action, resource, default, expected_result
):
previous_metadata = perms_ds.metadata()
updated_metadata = copy.deepcopy(previous_metadata)
updated_metadata.update(metadata)
try:
result = await perms_ds.permission_allowed(
actor, action, resource, default=default
)
assert result == expected_result
finally:
perms_ds._metadata_local = previous_metadata