From 4f89b7778287789a56906da512da7a943d0332cc Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 25 Feb 2026 03:21:29 +0000 Subject: [PATCH] Replace restrict_all/restrict_database/restrict_resource with TokenRestrictions dataclass Consolidates the three separate restriction parameters into a single TokenRestrictions dataclass in datasette.permissions. Updates all call sites: Datasette.create_signed_token(), Datasette.create_token(), build_token_restrictions(), the create_token hook spec, CreateTokenEvent, the CLI, and the create-token view. https://claude.ai/code/session_012TFFCamoYLTofV2vCgPrjV --- datasette/app.py | 62 ++++++++++++------------- datasette/cli.py | 11 +++-- datasette/default_permissions/tokens.py | 6 +-- datasette/events.py | 12 ++--- datasette/hookspecs.py | 8 ++-- datasette/permissions.py | 13 ++++++ datasette/views/special.py | 14 +++--- tests/test_api_write.py | 3 +- tests/test_auth.py | 38 ++++++++------- 9 files changed, 90 insertions(+), 77 deletions(-) diff --git a/datasette/app.py b/datasette/app.py index d4425cb4..458f63da 100644 --- a/datasette/app.py +++ b/datasette/app.py @@ -6,7 +6,7 @@ import contextvars from typing import TYPE_CHECKING, Any, Dict, Iterable, List if TYPE_CHECKING: - from datasette.permissions import Resource + from datasette.permissions import Resource, TokenRestrictions import asgi_csrf import collections import dataclasses @@ -722,60 +722,56 @@ class Datasette: def build_token_restrictions( self, - restrict_all: Iterable[str] | None = None, - restrict_database: Dict[str, Iterable[str]] | None = None, - restrict_resource: Dict[str, Dict[str, Iterable[str]]] | None = None, + restrictions: "TokenRestrictions", ) -> dict | None: """Build an abbreviated restrictions dict for use in token payloads. Returns a dict like ``{"a": [...], "d": {...}, "r": {...}}`` or ``None`` if there are no restrictions. """ - if not (restrict_all or restrict_database or restrict_resource): + if not (restrictions.all or restrictions.database or restrictions.resource): return None - restrictions: dict = {} - if restrict_all: - restrictions["a"] = [ - self._abbreviate_action(a) for a in restrict_all + result: dict = {} + if restrictions.all: + result["a"] = [ + self._abbreviate_action(a) for a in restrictions.all ] - if restrict_database: - restrictions["d"] = { + if restrictions.database: + result["d"] = { database: [self._abbreviate_action(a) for a in actions] - for database, actions in restrict_database.items() + for database, actions in restrictions.database.items() } - if restrict_resource: - restrictions["r"] = {} - for database, resources in restrict_resource.items(): + if restrictions.resource: + result["r"] = {} + for database, resources in restrictions.resource.items(): for resource, actions in resources.items(): - restrictions["r"].setdefault(database, {})[resource] = [ + result["r"].setdefault(database, {})[resource] = [ self._abbreviate_action(a) for a in actions ] - return restrictions + return result def create_signed_token( self, actor_id: str, *, expires_after: int | None = None, - restrict_all: Iterable[str] | None = None, - restrict_database: Dict[str, Iterable[str]] | None = None, - restrict_resource: Dict[str, Dict[str, Iterable[str]]] | None = None, + restrictions: "TokenRestrictions | None" = None, ) -> str: """Create a signed ``dstok_`` API token. This always creates a signed token regardless of installed plugins. Use :meth:`create_token` to go through the plugin hook instead. """ + from datasette.permissions import TokenRestrictions + token: dict = {"a": actor_id, "t": int(time.time())} if expires_after: token["d"] = expires_after - restrictions = self.build_token_restrictions( - restrict_all=restrict_all, - restrict_database=restrict_database, - restrict_resource=restrict_resource, - ) - if restrictions: - token["_r"] = restrictions + if restrictions is None: + restrictions = TokenRestrictions(all=[], database={}, resource={}) + abbreviated = self.build_token_restrictions(restrictions) + if abbreviated: + token["_r"] = abbreviated return "dstok_{}".format(self.sign(token, namespace="token")) async def create_token( @@ -783,9 +779,7 @@ class Datasette: actor_id: str, *, expires_after: int | None = None, - restrict_all: Iterable[str] | None = None, - restrict_database: Dict[str, Iterable[str]] | None = None, - restrict_resource: Dict[str, Dict[str, Iterable[str]]] | None = None, + restrictions: "TokenRestrictions | None" = None, ) -> str: """Create an API token, dispatching through the ``create_token`` hook. @@ -793,13 +787,15 @@ class Datasette: a database-backed token instead of a signed one. The default implementation creates a signed ``dstok_`` token. """ + from datasette.permissions import TokenRestrictions + + if restrictions is None: + restrictions = TokenRestrictions(all=[], database={}, resource={}) for result in pm.hook.create_token( datasette=self, actor_id=actor_id, expires_after=expires_after, - restrict_all=restrict_all or [], - restrict_database=restrict_database or {}, - restrict_resource=restrict_resource or {}, + restrictions=restrictions, ): result = await await_me_maybe(result) if result is not None: diff --git a/datasette/cli.py b/datasette/cli.py index d105715b..b8a46b12 100644 --- a/datasette/cli.py +++ b/datasette/cli.py @@ -841,12 +841,17 @@ def create_token( action ) + from datasette.permissions import TokenRestrictions + + restrictions = TokenRestrictions( + all=alls, + database=restrict_database, + resource=restrict_resource, + ) token = ds.create_signed_token( id, expires_after=expires_after, - restrict_all=alls, - restrict_database=restrict_database, - restrict_resource=restrict_resource, + restrictions=restrictions, ) click.echo(token) if debug: diff --git a/datasette/default_permissions/tokens.py b/datasette/default_permissions/tokens.py index f2769284..67e99e5a 100644 --- a/datasette/default_permissions/tokens.py +++ b/datasette/default_permissions/tokens.py @@ -96,7 +96,7 @@ def actor_from_signed_api_token(datasette: "Datasette", request) -> Optional[dic @hookimpl(trylast=True, specname="create_token") -def create_signed_api_token(datasette, actor_id, expires_after, restrict_all, restrict_database, restrict_resource): +def create_signed_api_token(datasette, actor_id, expires_after, restrictions): """Default create_token implementation: creates a signed dstok_ token. Runs last so that plugins like datasette-auth-tokens can override @@ -105,7 +105,5 @@ def create_signed_api_token(datasette, actor_id, expires_after, restrict_all, re return datasette.create_signed_token( actor_id, expires_after=expires_after, - restrict_all=restrict_all, - restrict_database=restrict_database, - restrict_resource=restrict_resource, + restrictions=restrictions, ) diff --git a/datasette/events.py b/datasette/events.py index 5cd5ba3d..35ddb085 100644 --- a/datasette/events.py +++ b/datasette/events.py @@ -53,19 +53,13 @@ class CreateTokenEvent(Event): :ivar expires_after: Number of seconds after which this token will expire. :type expires_after: int or None - :ivar restrict_all: Restricted permissions for this token. - :type restrict_all: list - :ivar restrict_database: Restricted database permissions for this token. - :type restrict_database: dict - :ivar restrict_resource: Restricted resource permissions for this token. - :type restrict_resource: dict + :ivar restrictions: Token restrictions (a :class:`TokenRestrictions` instance). + :type restrictions: TokenRestrictions """ name = "create-token" expires_after: int | None - restrict_all: list - restrict_database: dict - restrict_resource: dict + restrictions: object # TokenRestrictions @dataclass diff --git a/datasette/hookspecs.py b/datasette/hookspecs.py index 8903b14f..88457116 100644 --- a/datasette/hookspecs.py +++ b/datasette/hookspecs.py @@ -245,7 +245,7 @@ def write_wrapper(datasette, database, request, transaction): @hookspec -def create_token(datasette, actor_id, expires_after, restrict_all, restrict_database, restrict_resource): +def create_token(datasette, actor_id, expires_after, restrictions): """Create an API token for the given actor. Return a token string, or ``None`` to let the next implementation @@ -255,10 +255,8 @@ def create_token(datasette, actor_id, expires_after, restrict_all, restrict_data Parameters mirror ``Datasette.create_token()``: - ``actor_id``: the actor ID to embed in the token. - ``expires_after``: seconds until expiry, or ``None``. - - ``restrict_all``: list of action names to restrict globally. - - ``restrict_database``: ``{database: [actions]}`` restrictions. - - ``restrict_resource``: ``{database: {resource: [actions]}}`` - restrictions. + - ``restrictions``: a :class:`TokenRestrictions` dataclass with + ``all``, ``database``, and ``resource`` fields. The default (``trylast``) implementation creates a signed ``dstok_`` token. Plugins like ``datasette-auth-tokens`` can diff --git a/datasette/permissions.py b/datasette/permissions.py index b5e72b8e..dd2db966 100644 --- a/datasette/permissions.py +++ b/datasette/permissions.py @@ -121,6 +121,19 @@ class AllowedResource(NamedTuple): reason: str +@dataclass +class TokenRestrictions: + """Restrictions that can be applied to an API token. + + ``all`` restricts globally, ``database`` restricts per-database, + and ``resource`` restricts per-resource within a database. + """ + + all: list[str] + database: dict[str, list[str]] + resource: dict[str, dict[str, list[str]]] + + @dataclass(frozen=True, kw_only=True) class Action: name: str diff --git a/datasette/views/special.py b/datasette/views/special.py index 80040d20..e8962873 100644 --- a/datasette/views/special.py +++ b/datasette/views/special.py @@ -1,6 +1,7 @@ import json import logging from datasette.events import LogoutEvent, LoginEvent, CreateTokenEvent +from datasette.permissions import TokenRestrictions from datasette.resources import DatabaseResource, TableResource from datasette.utils.asgi import Response, Forbidden from datasette.utils import ( @@ -731,21 +732,22 @@ class CreateTokenView(BaseView): resource, [] ).append(action) + restrictions = TokenRestrictions( + all=restrict_all, + database=restrict_database, + resource=restrict_resource, + ) token = self.ds.create_signed_token( request.actor["id"], expires_after=expires_after, - restrict_all=restrict_all, - restrict_database=restrict_database, - restrict_resource=restrict_resource, + restrictions=restrictions, ) token_bits = self.ds.unsign(token[len("dstok_") :], namespace="token") await self.ds.track_event( CreateTokenEvent( actor=request.actor, expires_after=expires_after, - restrict_all=restrict_all, - restrict_database=restrict_database, - restrict_resource=restrict_resource, + restrictions=restrictions, ) ) context = await self.shared(request) diff --git a/tests/test_api_write.py b/tests/test_api_write.py index afc853f5..ecde33c4 100644 --- a/tests/test_api_write.py +++ b/tests/test_api_write.py @@ -1,4 +1,5 @@ from datasette.app import Datasette +from datasette.permissions import TokenRestrictions from datasette.utils import sqlite3 from .utils import last_event import pytest @@ -1362,7 +1363,7 @@ async def test_create_table( async def test_create_table_permissions( ds_write, permissions, body, expected_status, expected_errors ): - token = ds_write.create_signed_token("root", restrict_all=["view-instance"] + permissions) + token = ds_write.create_signed_token("root", restrictions=TokenRestrictions(all=["view-instance"] + permissions, database={}, resource={})) response = await ds_write.client.post( "/data/-/create", json=body, diff --git a/tests/test_auth.py b/tests/test_auth.py index cda09e08..4459d6d1 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -1,6 +1,7 @@ from bs4 import BeautifulSoup as Soup from .utils import cookie_was_deleted, last_event from click.testing import CliRunner +from datasette.permissions import TokenRestrictions from datasette.utils import baseconv from datasette.cli import cli from datasette.resources import ( @@ -209,9 +210,7 @@ def test_auth_create_token( event = last_event(app_client.ds) assert event.name == "create-token" assert event.expires_after == expected_duration - assert isinstance(event.restrict_all, list) - assert isinstance(event.restrict_database, dict) - assert isinstance(event.restrict_resource, dict) + assert isinstance(event.restrictions, TokenRestrictions) # Extract token from page token = response2.text.split('value="dstok_')[1].split('"')[0] details = app_client.ds.unsign(token, "token") @@ -509,7 +508,11 @@ async def test_create_signed_token_with_restrictions(ds_client): token = ds_client.ds.create_signed_token( "test_actor", expires_after=3600, - restrict_all=["view-instance"], + restrictions=TokenRestrictions( + all=["view-instance"], + database={}, + resource={}, + ), ) decoded = ds_client.ds.unsign(token[len("dstok_"):], namespace="token") assert decoded["a"] == "test_actor" @@ -523,24 +526,24 @@ async def test_build_token_restrictions(ds_client): """build_token_restrictions() returns abbreviated restriction dicts""" ds = ds_client.ds # No restrictions returns None - assert ds.build_token_restrictions() is None + assert ds.build_token_restrictions(TokenRestrictions(all=[], database={}, resource={})) is None - # With restrict_all - result = ds.build_token_restrictions(restrict_all=["view-instance"]) + # With all + result = ds.build_token_restrictions(TokenRestrictions(all=["view-instance"], database={}, resource={})) assert result is not None assert "a" in result - # With restrict_database + # With database result = ds.build_token_restrictions( - restrict_database={"mydb": ["view-table"]} + TokenRestrictions(all=[], database={"mydb": ["view-table"]}, resource={}) ) assert result is not None assert "d" in result assert "mydb" in result["d"] - # With restrict_resource + # With resource result = ds.build_token_restrictions( - restrict_resource={"mydb": {"mytable": ["insert-row"]}} + TokenRestrictions(all=[], database={}, resource={"mydb": {"mytable": ["insert-row"]}}) ) assert result is not None assert "r" in result @@ -563,8 +566,11 @@ async def test_create_token_hook_default_with_restrictions(ds_client): token = await ds_client.ds.create_token( "test_actor", expires_after=7200, - restrict_all=["view-instance"], - restrict_database={"fixtures": ["view-table"]}, + restrictions=TokenRestrictions( + all=["view-instance"], + database={"fixtures": ["view-table"]}, + resource={}, + ), ) assert token.startswith("dstok_") decoded = ds_client.ds.unsign(token[len("dstok_"):], namespace="token") @@ -584,7 +590,7 @@ async def test_create_token_hook_can_be_overridden(ds_client): @staticmethod @hookimpl(specname="create_token") - def custom_create_token(datasette, actor_id, expires_after, restrict_all, restrict_database, restrict_resource): + def custom_create_token(datasette, actor_id, expires_after, restrictions): return "custom_token_for_{}".format(actor_id) pm.register(CustomTokenPlugin, name="custom_token_test_plugin") @@ -606,7 +612,7 @@ async def test_create_token_hook_async_override(ds_client): @staticmethod @hookimpl(specname="create_token") - def async_create_token(datasette, actor_id, expires_after, restrict_all, restrict_database, restrict_resource): + def async_create_token(datasette, actor_id, expires_after, restrictions): async def inner(): return "async_token_for_{}".format(actor_id) return inner() @@ -630,7 +636,7 @@ async def test_create_token_hook_none_falls_through(ds_client): @staticmethod @hookimpl(specname="create_token") - def none_create_token(datasette, actor_id, expires_after, restrict_all, restrict_database, restrict_resource): + def none_create_token(datasette, actor_id, expires_after, restrictions): return None pm.register(NoneTokenPlugin, name="none_token_test_plugin")