Replace restrict_all/restrict_database/restrict_resource with TokenRestrictions dataclass

Consolidates the three separate restriction parameters into a single
TokenRestrictions dataclass in datasette.permissions. Updates all call
sites: Datasette.create_signed_token(), Datasette.create_token(),
build_token_restrictions(), the create_token hook spec, CreateTokenEvent,
the CLI, and the create-token view.

https://claude.ai/code/session_012TFFCamoYLTofV2vCgPrjV
This commit is contained in:
Claude 2026-02-25 03:21:29 +00:00
commit 4f89b77782
No known key found for this signature in database
9 changed files with 90 additions and 77 deletions

View file

@ -6,7 +6,7 @@ import contextvars
from typing import TYPE_CHECKING, Any, Dict, Iterable, List
if TYPE_CHECKING:
from datasette.permissions import Resource
from datasette.permissions import Resource, TokenRestrictions
import asgi_csrf
import collections
import dataclasses
@ -722,60 +722,56 @@ class Datasette:
def build_token_restrictions(
self,
restrict_all: Iterable[str] | None = None,
restrict_database: Dict[str, Iterable[str]] | None = None,
restrict_resource: Dict[str, Dict[str, Iterable[str]]] | None = None,
restrictions: "TokenRestrictions",
) -> dict | None:
"""Build an abbreviated restrictions dict for use in token payloads.
Returns a dict like ``{"a": [...], "d": {...}, "r": {...}}``
or ``None`` if there are no restrictions.
"""
if not (restrict_all or restrict_database or restrict_resource):
if not (restrictions.all or restrictions.database or restrictions.resource):
return None
restrictions: dict = {}
if restrict_all:
restrictions["a"] = [
self._abbreviate_action(a) for a in restrict_all
result: dict = {}
if restrictions.all:
result["a"] = [
self._abbreviate_action(a) for a in restrictions.all
]
if restrict_database:
restrictions["d"] = {
if restrictions.database:
result["d"] = {
database: [self._abbreviate_action(a) for a in actions]
for database, actions in restrict_database.items()
for database, actions in restrictions.database.items()
}
if restrict_resource:
restrictions["r"] = {}
for database, resources in restrict_resource.items():
if restrictions.resource:
result["r"] = {}
for database, resources in restrictions.resource.items():
for resource, actions in resources.items():
restrictions["r"].setdefault(database, {})[resource] = [
result["r"].setdefault(database, {})[resource] = [
self._abbreviate_action(a) for a in actions
]
return restrictions
return result
def create_signed_token(
self,
actor_id: str,
*,
expires_after: int | None = None,
restrict_all: Iterable[str] | None = None,
restrict_database: Dict[str, Iterable[str]] | None = None,
restrict_resource: Dict[str, Dict[str, Iterable[str]]] | None = None,
restrictions: "TokenRestrictions | None" = None,
) -> str:
"""Create a signed ``dstok_`` API token.
This always creates a signed token regardless of installed plugins.
Use :meth:`create_token` to go through the plugin hook instead.
"""
from datasette.permissions import TokenRestrictions
token: dict = {"a": actor_id, "t": int(time.time())}
if expires_after:
token["d"] = expires_after
restrictions = self.build_token_restrictions(
restrict_all=restrict_all,
restrict_database=restrict_database,
restrict_resource=restrict_resource,
)
if restrictions:
token["_r"] = restrictions
if restrictions is None:
restrictions = TokenRestrictions(all=[], database={}, resource={})
abbreviated = self.build_token_restrictions(restrictions)
if abbreviated:
token["_r"] = abbreviated
return "dstok_{}".format(self.sign(token, namespace="token"))
async def create_token(
@ -783,9 +779,7 @@ class Datasette:
actor_id: str,
*,
expires_after: int | None = None,
restrict_all: Iterable[str] | None = None,
restrict_database: Dict[str, Iterable[str]] | None = None,
restrict_resource: Dict[str, Dict[str, Iterable[str]]] | None = None,
restrictions: "TokenRestrictions | None" = None,
) -> str:
"""Create an API token, dispatching through the ``create_token`` hook.
@ -793,13 +787,15 @@ class Datasette:
a database-backed token instead of a signed one. The default
implementation creates a signed ``dstok_`` token.
"""
from datasette.permissions import TokenRestrictions
if restrictions is None:
restrictions = TokenRestrictions(all=[], database={}, resource={})
for result in pm.hook.create_token(
datasette=self,
actor_id=actor_id,
expires_after=expires_after,
restrict_all=restrict_all or [],
restrict_database=restrict_database or {},
restrict_resource=restrict_resource or {},
restrictions=restrictions,
):
result = await await_me_maybe(result)
if result is not None:

View file

@ -841,12 +841,17 @@ def create_token(
action
)
from datasette.permissions import TokenRestrictions
restrictions = TokenRestrictions(
all=alls,
database=restrict_database,
resource=restrict_resource,
)
token = ds.create_signed_token(
id,
expires_after=expires_after,
restrict_all=alls,
restrict_database=restrict_database,
restrict_resource=restrict_resource,
restrictions=restrictions,
)
click.echo(token)
if debug:

View file

@ -96,7 +96,7 @@ def actor_from_signed_api_token(datasette: "Datasette", request) -> Optional[dic
@hookimpl(trylast=True, specname="create_token")
def create_signed_api_token(datasette, actor_id, expires_after, restrict_all, restrict_database, restrict_resource):
def create_signed_api_token(datasette, actor_id, expires_after, restrictions):
"""Default create_token implementation: creates a signed dstok_ token.
Runs last so that plugins like datasette-auth-tokens can override
@ -105,7 +105,5 @@ def create_signed_api_token(datasette, actor_id, expires_after, restrict_all, re
return datasette.create_signed_token(
actor_id,
expires_after=expires_after,
restrict_all=restrict_all,
restrict_database=restrict_database,
restrict_resource=restrict_resource,
restrictions=restrictions,
)

View file

@ -53,19 +53,13 @@ class CreateTokenEvent(Event):
:ivar expires_after: Number of seconds after which this token will expire.
:type expires_after: int or None
:ivar restrict_all: Restricted permissions for this token.
:type restrict_all: list
:ivar restrict_database: Restricted database permissions for this token.
:type restrict_database: dict
:ivar restrict_resource: Restricted resource permissions for this token.
:type restrict_resource: dict
:ivar restrictions: Token restrictions (a :class:`TokenRestrictions` instance).
:type restrictions: TokenRestrictions
"""
name = "create-token"
expires_after: int | None
restrict_all: list
restrict_database: dict
restrict_resource: dict
restrictions: object # TokenRestrictions
@dataclass

View file

@ -245,7 +245,7 @@ def write_wrapper(datasette, database, request, transaction):
@hookspec
def create_token(datasette, actor_id, expires_after, restrict_all, restrict_database, restrict_resource):
def create_token(datasette, actor_id, expires_after, restrictions):
"""Create an API token for the given actor.
Return a token string, or ``None`` to let the next implementation
@ -255,10 +255,8 @@ def create_token(datasette, actor_id, expires_after, restrict_all, restrict_data
Parameters mirror ``Datasette.create_token()``:
- ``actor_id``: the actor ID to embed in the token.
- ``expires_after``: seconds until expiry, or ``None``.
- ``restrict_all``: list of action names to restrict globally.
- ``restrict_database``: ``{database: [actions]}`` restrictions.
- ``restrict_resource``: ``{database: {resource: [actions]}}``
restrictions.
- ``restrictions``: a :class:`TokenRestrictions` dataclass with
``all``, ``database``, and ``resource`` fields.
The default (``trylast``) implementation creates a signed
``dstok_`` token. Plugins like ``datasette-auth-tokens`` can

View file

@ -121,6 +121,19 @@ class AllowedResource(NamedTuple):
reason: str
@dataclass
class TokenRestrictions:
"""Restrictions that can be applied to an API token.
``all`` restricts globally, ``database`` restricts per-database,
and ``resource`` restricts per-resource within a database.
"""
all: list[str]
database: dict[str, list[str]]
resource: dict[str, dict[str, list[str]]]
@dataclass(frozen=True, kw_only=True)
class Action:
name: str

View file

@ -1,6 +1,7 @@
import json
import logging
from datasette.events import LogoutEvent, LoginEvent, CreateTokenEvent
from datasette.permissions import TokenRestrictions
from datasette.resources import DatabaseResource, TableResource
from datasette.utils.asgi import Response, Forbidden
from datasette.utils import (
@ -731,21 +732,22 @@ class CreateTokenView(BaseView):
resource, []
).append(action)
restrictions = TokenRestrictions(
all=restrict_all,
database=restrict_database,
resource=restrict_resource,
)
token = self.ds.create_signed_token(
request.actor["id"],
expires_after=expires_after,
restrict_all=restrict_all,
restrict_database=restrict_database,
restrict_resource=restrict_resource,
restrictions=restrictions,
)
token_bits = self.ds.unsign(token[len("dstok_") :], namespace="token")
await self.ds.track_event(
CreateTokenEvent(
actor=request.actor,
expires_after=expires_after,
restrict_all=restrict_all,
restrict_database=restrict_database,
restrict_resource=restrict_resource,
restrictions=restrictions,
)
)
context = await self.shared(request)

View file

@ -1,4 +1,5 @@
from datasette.app import Datasette
from datasette.permissions import TokenRestrictions
from datasette.utils import sqlite3
from .utils import last_event
import pytest
@ -1362,7 +1363,7 @@ async def test_create_table(
async def test_create_table_permissions(
ds_write, permissions, body, expected_status, expected_errors
):
token = ds_write.create_signed_token("root", restrict_all=["view-instance"] + permissions)
token = ds_write.create_signed_token("root", restrictions=TokenRestrictions(all=["view-instance"] + permissions, database={}, resource={}))
response = await ds_write.client.post(
"/data/-/create",
json=body,

View file

@ -1,6 +1,7 @@
from bs4 import BeautifulSoup as Soup
from .utils import cookie_was_deleted, last_event
from click.testing import CliRunner
from datasette.permissions import TokenRestrictions
from datasette.utils import baseconv
from datasette.cli import cli
from datasette.resources import (
@ -209,9 +210,7 @@ def test_auth_create_token(
event = last_event(app_client.ds)
assert event.name == "create-token"
assert event.expires_after == expected_duration
assert isinstance(event.restrict_all, list)
assert isinstance(event.restrict_database, dict)
assert isinstance(event.restrict_resource, dict)
assert isinstance(event.restrictions, TokenRestrictions)
# Extract token from page
token = response2.text.split('value="dstok_')[1].split('"')[0]
details = app_client.ds.unsign(token, "token")
@ -509,7 +508,11 @@ async def test_create_signed_token_with_restrictions(ds_client):
token = ds_client.ds.create_signed_token(
"test_actor",
expires_after=3600,
restrict_all=["view-instance"],
restrictions=TokenRestrictions(
all=["view-instance"],
database={},
resource={},
),
)
decoded = ds_client.ds.unsign(token[len("dstok_"):], namespace="token")
assert decoded["a"] == "test_actor"
@ -523,24 +526,24 @@ async def test_build_token_restrictions(ds_client):
"""build_token_restrictions() returns abbreviated restriction dicts"""
ds = ds_client.ds
# No restrictions returns None
assert ds.build_token_restrictions() is None
assert ds.build_token_restrictions(TokenRestrictions(all=[], database={}, resource={})) is None
# With restrict_all
result = ds.build_token_restrictions(restrict_all=["view-instance"])
# With all
result = ds.build_token_restrictions(TokenRestrictions(all=["view-instance"], database={}, resource={}))
assert result is not None
assert "a" in result
# With restrict_database
# With database
result = ds.build_token_restrictions(
restrict_database={"mydb": ["view-table"]}
TokenRestrictions(all=[], database={"mydb": ["view-table"]}, resource={})
)
assert result is not None
assert "d" in result
assert "mydb" in result["d"]
# With restrict_resource
# With resource
result = ds.build_token_restrictions(
restrict_resource={"mydb": {"mytable": ["insert-row"]}}
TokenRestrictions(all=[], database={}, resource={"mydb": {"mytable": ["insert-row"]}})
)
assert result is not None
assert "r" in result
@ -563,8 +566,11 @@ async def test_create_token_hook_default_with_restrictions(ds_client):
token = await ds_client.ds.create_token(
"test_actor",
expires_after=7200,
restrict_all=["view-instance"],
restrict_database={"fixtures": ["view-table"]},
restrictions=TokenRestrictions(
all=["view-instance"],
database={"fixtures": ["view-table"]},
resource={},
),
)
assert token.startswith("dstok_")
decoded = ds_client.ds.unsign(token[len("dstok_"):], namespace="token")
@ -584,7 +590,7 @@ async def test_create_token_hook_can_be_overridden(ds_client):
@staticmethod
@hookimpl(specname="create_token")
def custom_create_token(datasette, actor_id, expires_after, restrict_all, restrict_database, restrict_resource):
def custom_create_token(datasette, actor_id, expires_after, restrictions):
return "custom_token_for_{}".format(actor_id)
pm.register(CustomTokenPlugin, name="custom_token_test_plugin")
@ -606,7 +612,7 @@ async def test_create_token_hook_async_override(ds_client):
@staticmethod
@hookimpl(specname="create_token")
def async_create_token(datasette, actor_id, expires_after, restrict_all, restrict_database, restrict_resource):
def async_create_token(datasette, actor_id, expires_after, restrictions):
async def inner():
return "async_token_for_{}".format(actor_id)
return inner()
@ -630,7 +636,7 @@ async def test_create_token_hook_none_falls_through(ds_client):
@staticmethod
@hookimpl(specname="create_token")
def none_create_token(datasette, actor_id, expires_after, restrict_all, restrict_database, restrict_resource):
def none_create_token(datasette, actor_id, expires_after, restrictions):
return None
pm.register(NoneTokenPlugin, name="none_token_test_plugin")