register_token_handler() plugin hook for custom API token backends (#2650)

Closes #2649

* Add register_token_handler plugin hook for pluggable token backends

Adds a new register_token_handler hook that allows plugins to provide
custom token creation and verification backends. This enables plugins
like datasette-oauth to issue tokens without depending on specific
backend plugins like datasette-auth-tokens.

Key changes:
- New datasette/tokens.py with TokenHandler base class and SignedTokenHandler
  (the default signed-token implementation moved here)
- New register_token_handler hookspec in hookspecs.py
- Datasette.create_token() is now async and delegates to token handlers
- New Datasette.verify_token() method tries all handlers in sequence
- handler= parameter on create_token() to select a specific backend
- TokenHandler exported from datasette package for plugin use
- Fixed actor_from_request loop to await all coroutines (avoids warnings)

* Add documentation and hook test for register_token_handler

Fixes CI failures: the new hook needs a section in docs/plugin_hooks.rst
(checked by test_plugin_hooks_are_documented) and a test_hook_* function
in test_plugins.py (checked by test_plugin_hooks_have_tests).

* Register tokens module as separate default plugin

Instead of re-exporting hookimpls from default_permissions/__init__.py,
register datasette.default_permissions.tokens as its own DEFAULT_PLUGINS
entry. Cleaner and avoids confusing import-for-side-effect patterns.

* Replace restrict_x params with TokenRestrictions dataclass

Consolidates the three separate restrict_all, restrict_database, and
restrict_resource parameters into a single TokenRestrictions dataclass.
Cleaner API surface for both Datasette.create_token() and
TokenHandler.create_token().

Also clarifies docs re: default handler selection via pluggy ordering.

* Add builder methods to TokenRestrictions

Adds allow_all(), allow_database(), and allow_resource() methods that
return self for chaining. Callers no longer need to manipulate nested
dicts directly:

    restrictions = (TokenRestrictions()
        .allow_all("view-instance")
        .allow_database("mydb", "create-table")
        .allow_resource("mydb", "mytable", "insert-row"))

* docs: add 1.0a25 upgrade guide section for create_token() signature change

Ref: https://github.com/simonw/datasette/issues/2649#issuecomment-3962639393

* docs: note that create_token() is now async in upgrade guide

* docs: update internals, plugin_hooks, authentication for new token API

- internals.rst: new async create_token() signature with restrictions
  and handler params, add TokenRestrictions reference docs
- plugin_hooks.rst: show full create_token signature in TokenHandler
  example, note list returns and error cases
- authentication.rst: cross-reference TokenRestrictions from the
  restrictions section

* style: apply black formatting to token handler files

* docs: fix RST heading underline length in internals.rst

* tests: add restrictions round-trip and expiration tests for token handler

Covers allow_database/allow_resource builders, _r payload encoding,
and token_expires in verified actors. Coverage 76% -> 90%.

* tests: add test for signed tokens disabled

* fix: add TokenRestrictions TYPE_CHECKING import to fix ruff F821

* docs: regenerate plugins.rst with cog

* docs: reformat code blocks in plugin_hooks.rst with blacken-docs

* docs: add await .verify_token() to internals.rst

* tests: rewrite register_token_handler test to use real plugin handler

Adds a HardcodedTokenHandler to the test plugins dir that creates
tokens like dstok_hardcoded_token_1. The test now exercises creating
tokens via the default handler (which is the plugin's hardcoded one),
by explicitly naming the hardcoded handler, and by explicitly naming
the signed handler -- then verifies each token round-trips correctly.

* tests: clarify test_token_handler_via_http tests the default signed handler

* fix: use handler="signed" explicitly where signed tokens are expected

The HardcodedTokenHandler in my_plugin.py gets globally registered,
so create_token() without a handler name picks it up as the default.
Fix the create-token view, CLI, and tests to explicitly request the
signed handler where they depend on signed token behavior.

* fix: use handler="signed" in test_create_table_permissions

https://claude.ai/code/session_013cQFiDQjYRrRBH2biFfKuS
This commit is contained in:
Simon Willison 2026-02-25 16:32:45 -08:00 committed by GitHub
commit c96dc5ce26
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 840 additions and 165 deletions

View file

@ -1,6 +1,7 @@
from datasette.permissions import Permission # noqa
from datasette.version import __version_info__, __version__ # noqa
from datasette.events import Event # noqa
from datasette.tokens import TokenHandler, TokenRestrictions # noqa
from datasette.utils.asgi import Forbidden, NotFound, Request, Response # noqa
from datasette.utils import actor_matches_allow # noqa
from datasette.views import Context # noqa

View file

@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any, Dict, Iterable, List
if TYPE_CHECKING:
from datasette.permissions import Resource
from datasette.tokens import TokenRestrictions
import asgi_csrf
import collections
import dataclasses
@ -713,44 +714,70 @@ class Datasette:
"""
return _in_datasette_client.get()
def create_token(
def _token_handlers(self):
"""Collect all registered token handlers from plugins."""
from datasette.tokens import TokenHandler
handlers = []
for result in pm.hook.register_token_handler(datasette=self):
if isinstance(result, TokenHandler):
handlers.append(result)
elif isinstance(result, list):
handlers.extend(h for h in result if isinstance(h, TokenHandler))
return handlers
async def create_token(
self,
actor_id: str,
*,
expires_after: int | None = None,
restrict_all: Iterable[str] | None = None,
restrict_database: Dict[str, Iterable[str]] | None = None,
restrict_resource: Dict[str, Dict[str, Iterable[str]]] | None = None,
):
token = {"a": actor_id, "t": int(time.time())}
if expires_after:
token["d"] = expires_after
restrictions: "TokenRestrictions | None" = None,
handler: str | None = None,
) -> str:
"""
Create an API token for the given actor.
def abbreviate_action(action):
# rename to abbr if possible
action_obj = self.actions.get(action)
if not action_obj:
return action
return action_obj.abbr or action
Uses the first registered token handler by default, or a specific
handler if ``handler`` is provided (matched by handler name).
if expires_after:
token["d"] = expires_after
if restrict_all or restrict_database or restrict_resource:
token["_r"] = {}
if restrict_all:
token["_r"]["a"] = [abbreviate_action(a) for a in restrict_all]
if restrict_database:
token["_r"]["d"] = {}
for database, actions in restrict_database.items():
token["_r"]["d"][database] = [abbreviate_action(a) for a in actions]
if restrict_resource:
token["_r"]["r"] = {}
for database, resources in restrict_resource.items():
for resource, actions in resources.items():
token["_r"]["r"].setdefault(database, {})[resource] = [
abbreviate_action(a) for a in actions
]
return "dstok_{}".format(self.sign(token, namespace="token"))
Pass a :class:`TokenRestrictions` to limit which actions the token
can perform.
"""
handlers = self._token_handlers()
if not handlers:
raise RuntimeError("No token handlers are registered")
if handler is not None:
matched = [h for h in handlers if h.name == handler]
if not matched:
available = [h.name for h in handlers]
raise ValueError(
f"Token handler {handler!r} not found. "
f"Available handlers: {available}"
)
chosen = matched[0]
else:
chosen = handlers[0]
return await chosen.create_token(
self,
actor_id,
expires_after=expires_after,
restrictions=restrictions,
)
async def verify_token(self, token: str) -> dict | None:
"""
Verify an API token by trying all registered token handlers.
Returns an actor dict from the first handler that recognizes the
token, or None if no handler accepts it.
"""
for token_handler in self._token_handlers():
result = await token_handler.verify_token(self, token)
if result is not None:
return result
return None
def get_database(self, name=None, route=None):
if route is not None:
@ -2159,10 +2186,13 @@ class DatasetteRouter:
# Handle authentication
default_actor = scope.get("actor") or None
actor = None
for actor in pm.hook.actor_from_request(datasette=self.ds, request=request):
actor = await await_me_maybe(actor)
if actor:
break
results = pm.hook.actor_from_request(datasette=self.ds, request=request)
for result in results:
result = await await_me_maybe(result)
if result and actor is None:
actor = result
# Don't break — we must await all coroutines to avoid
# "coroutine was never awaited" warnings
scope_modifications["actor"] = actor or default_actor
scope = dict(scope, **scope_modifications)

View file

@ -832,21 +832,23 @@ def create_token(
err=True,
)
restrict_database = {}
for database, action in databases:
restrict_database.setdefault(database, []).append(action)
restrict_resource = {}
for database, resource, action in resources:
restrict_resource.setdefault(database, {}).setdefault(resource, []).append(
action
)
from datasette.tokens import TokenRestrictions
token = ds.create_token(
id,
expires_after=expires_after,
restrict_all=alls,
restrict_database=restrict_database,
restrict_resource=restrict_resource,
restrictions = TokenRestrictions()
for action in alls:
restrictions.allow_all(action)
for database, action in databases:
restrictions.allow_database(database, action)
for database, resource, action in resources:
restrictions.allow_resource(database, resource, action)
token = run_sync(
lambda: ds.create_token(
id,
expires_after=expires_after,
restrictions=restrictions,
handler="signed",
)
)
click.echo(token)
if debug:

View file

@ -37,7 +37,6 @@ from .defaults import (
default_action_permissions_sql as default_action_permissions_sql,
DEFAULT_ALLOW_ACTIONS as DEFAULT_ALLOW_ACTIONS,
)
from .tokens import actor_from_signed_api_token as actor_from_signed_api_token
@hookimpl

View file

@ -1,44 +1,35 @@
"""
Token authentication for Datasette.
Handles signed API tokens (dstok_ prefix).
Registers the default SignedTokenHandler and delegates token verification
to datasette.verify_token() so all registered handlers are tried.
"""
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from datasette.app import Datasette
import itsdangerous
from datasette import hookimpl
from datasette.tokens import SignedTokenHandler
@hookimpl
def register_token_handler(datasette: "Datasette"):
"""Register the default signed token handler."""
return SignedTokenHandler()
@hookimpl(specname="actor_from_request")
def actor_from_signed_api_token(datasette: "Datasette", request) -> Optional[dict]:
async def actor_from_signed_api_token(
datasette: "Datasette", request
) -> Optional[dict]:
"""
Authenticate requests using signed API tokens (dstok_ prefix).
Token structure (signed JSON):
{
"a": "actor_id", # Actor ID
"t": 1234567890, # Timestamp (Unix epoch)
"d": 3600, # Optional: Duration in seconds
"_r": {...} # Optional: Restrictions
}
Authenticate requests using API tokens by delegating to all registered
token handlers via datasette.verify_token().
"""
prefix = "dstok_"
# Check if tokens are enabled
if not datasette.setting("allow_signed_tokens"):
return None
max_signed_tokens_ttl = datasette.setting("max_signed_tokens_ttl")
# Get authorization header
authorization = request.headers.get("authorization")
if not authorization:
return None
@ -46,50 +37,4 @@ def actor_from_signed_api_token(datasette: "Datasette", request) -> Optional[dic
return None
token = authorization[len("Bearer ") :]
if not token.startswith(prefix):
return None
# Remove prefix and verify signature
token = token[len(prefix) :]
try:
decoded = datasette.unsign(token, namespace="token")
except itsdangerous.BadSignature:
return None
# Validate timestamp
if "t" not in decoded:
return None
created = decoded["t"]
if not isinstance(created, int):
return None
# Handle duration/expiry
duration = decoded.get("d")
if duration is not None and not isinstance(duration, int):
return None
# Apply max TTL if configured
if (duration is None and max_signed_tokens_ttl) or (
duration is not None
and max_signed_tokens_ttl
and duration > max_signed_tokens_ttl
):
duration = max_signed_tokens_ttl
# Check expiry
if duration:
if time.time() - created > duration:
return None
# Build actor dict
actor = {"id": decoded["a"], "token": "dstok"}
# Copy restrictions if present
if "_r" in decoded:
actor["_r"] = decoded["_r"]
# Add expiry timestamp if applicable
if duration:
actor["token_expires"] = created + duration
return actor
return await datasette.verify_token(token)

View file

@ -222,6 +222,11 @@ def top_canned_query(datasette, request, database, query_name):
"""HTML to include at the top of the canned query page"""
@hookspec
def register_token_handler(datasette):
"""Return a TokenHandler instance for token creation and verification"""
@hookspec
def write_wrapper(datasette, database, request, transaction):
"""Called when a write function is about to execute.

View file

@ -23,6 +23,7 @@ DEFAULT_PLUGINS = (
"datasette.sql_functions",
"datasette.actor_auth_cookie",
"datasette.default_permissions",
"datasette.default_permissions.tokens",
"datasette.default_actions",
"datasette.default_magic_parameters",
"datasette.blob_renderer",

180
datasette/tokens.py Normal file
View file

@ -0,0 +1,180 @@
"""
Token handler system for Datasette.
Provides a base class for token handlers and the default signed token handler.
Plugins can implement register_token_handler to provide custom token backends
(e.g. database-backed tokens that can be revoked and audited).
"""
from __future__ import annotations
import dataclasses
import time
from typing import TYPE_CHECKING, Optional
import itsdangerous
if TYPE_CHECKING:
from datasette.app import Datasette
@dataclasses.dataclass
class TokenRestrictions:
"""
Restrictions to apply to a token, limiting which actions it can perform.
Use the builder methods to construct restrictions::
restrictions = (TokenRestrictions()
.allow_all("view-instance")
.allow_database("mydb", "create-table")
.allow_resource("mydb", "mytable", "insert-row"))
"""
all: list[str] = dataclasses.field(default_factory=list)
database: dict[str, list[str]] = dataclasses.field(default_factory=dict)
resource: dict[str, dict[str, list[str]]] = dataclasses.field(default_factory=dict)
def allow_all(self, action: str) -> "TokenRestrictions":
"""Allow an action across all databases and resources."""
self.all.append(action)
return self
def allow_database(self, database: str, action: str) -> "TokenRestrictions":
"""Allow an action on a specific database."""
self.database.setdefault(database, []).append(action)
return self
def allow_resource(
self, database: str, resource: str, action: str
) -> "TokenRestrictions":
"""Allow an action on a specific resource within a database."""
self.resource.setdefault(database, {}).setdefault(resource, []).append(action)
return self
class TokenHandler:
"""
Base class for token handlers.
Subclass this and implement create_token() and verify_token() to provide
a custom token backend. Return an instance from the register_token_handler hook.
"""
name: str = ""
async def create_token(
self,
datasette: "Datasette",
actor_id: str,
*,
expires_after: Optional[int] = None,
restrictions: Optional[TokenRestrictions] = None,
) -> str:
"""Create and return a token string for the given actor."""
raise NotImplementedError
async def verify_token(self, datasette: "Datasette", token: str) -> Optional[dict]:
"""
Verify a token and return an actor dict, or None if this handler
does not recognize the token.
"""
raise NotImplementedError
class SignedTokenHandler(TokenHandler):
"""
Default token handler using itsdangerous signed tokens (dstok_ prefix).
"""
name = "signed"
async def create_token(
self,
datasette: "Datasette",
actor_id: str,
*,
expires_after: Optional[int] = None,
restrictions: Optional[TokenRestrictions] = None,
) -> str:
if not datasette.setting("allow_signed_tokens"):
raise ValueError(
"Signed tokens are not enabled for this Datasette instance"
)
token = {"a": actor_id, "t": int(time.time())}
def abbreviate_action(action):
action_obj = datasette.actions.get(action)
if not action_obj:
return action
return action_obj.abbr or action
if expires_after:
token["d"] = expires_after
if restrictions and (
restrictions.all or restrictions.database or restrictions.resource
):
token["_r"] = {}
if restrictions.all:
token["_r"]["a"] = [abbreviate_action(a) for a in restrictions.all]
if restrictions.database:
token["_r"]["d"] = {}
for database, actions in restrictions.database.items():
token["_r"]["d"][database] = [abbreviate_action(a) for a in actions]
if restrictions.resource:
token["_r"]["r"] = {}
for database, resources in restrictions.resource.items():
for resource, actions in resources.items():
token["_r"]["r"].setdefault(database, {})[resource] = [
abbreviate_action(a) for a in actions
]
return "dstok_{}".format(datasette.sign(token, namespace="token"))
async def verify_token(self, datasette: "Datasette", token: str) -> Optional[dict]:
prefix = "dstok_"
if not datasette.setting("allow_signed_tokens"):
return None
max_signed_tokens_ttl = datasette.setting("max_signed_tokens_ttl")
if not token.startswith(prefix):
return None
raw = token[len(prefix) :]
try:
decoded = datasette.unsign(raw, namespace="token")
except itsdangerous.BadSignature:
return None
if "t" not in decoded:
return None
created = decoded["t"]
if not isinstance(created, int):
return None
duration = decoded.get("d")
if duration is not None and not isinstance(duration, int):
return None
if (duration is None and max_signed_tokens_ttl) or (
duration is not None
and max_signed_tokens_ttl
and duration > max_signed_tokens_ttl
):
duration = max_signed_tokens_ttl
if duration:
if time.time() - created > duration:
return None
actor = {"id": decoded["a"], "token": "dstok"}
if "_r" in decoded:
actor["_r"] = decoded["_r"]
if duration:
actor["token_expires"] = created + duration
return actor

View file

@ -710,42 +710,36 @@ class CreateTokenView(BaseView):
errors.append("Invalid expire duration unit")
# Are there any restrictions?
restrict_all = []
restrict_database = {}
restrict_resource = {}
from datasette.tokens import TokenRestrictions
restrictions = TokenRestrictions()
for key in form:
if key.startswith("all:") and key.count(":") == 1:
restrict_all.append(key.split(":")[1])
restrictions.allow_all(key.split(":")[1])
elif key.startswith("database:") and key.count(":") == 2:
bits = key.split(":")
database = tilde_decode(bits[1])
action = bits[2]
restrict_database.setdefault(database, []).append(action)
restrictions.allow_database(tilde_decode(bits[1]), bits[2])
elif key.startswith("resource:") and key.count(":") == 3:
bits = key.split(":")
database = tilde_decode(bits[1])
resource = tilde_decode(bits[2])
action = bits[3]
restrict_resource.setdefault(database, {}).setdefault(
resource, []
).append(action)
restrictions.allow_resource(
tilde_decode(bits[1]), tilde_decode(bits[2]), bits[3]
)
token = self.ds.create_token(
token = await self.ds.create_token(
request.actor["id"],
expires_after=expires_after,
restrict_all=restrict_all,
restrict_database=restrict_database,
restrict_resource=restrict_resource,
restrictions=restrictions,
handler="signed",
)
token_bits = self.ds.unsign(token[len("dstok_") :], namespace="token")
await self.ds.track_event(
CreateTokenEvent(
actor=request.actor,
expires_after=expires_after,
restrict_all=restrict_all,
restrict_database=restrict_database,
restrict_resource=restrict_resource,
restrict_all=restrictions.all,
restrict_database=restrictions.database,
restrict_resource=restrictions.resource,
)
)
context = await self.shared(request)

View file

@ -1072,6 +1072,7 @@ cannot grant new access. If the underlying actor is denied by ``allow`` rules in
``datasette.yaml`` or by a plugin, a token that lists that resource in its
``"_r"`` section will still be denied.
To create tokens with restrictions in Python code, use the :ref:`TokenRestrictions <TokenRestrictions>` builder and pass it to :ref:`datasette.create_token() <datasette_create_token>`.
.. _permissions_plugins:

View file

@ -673,8 +673,8 @@ This example checks if the user can access a specific table, and sets ``private`
.. _datasette_create_token:
.create_token(actor_id, expires_after=None, restrict_all=None, restrict_database=None, restrict_resource=None)
--------------------------------------------------------------------------------------------------------------
await .create_token(actor_id, expires_after=None, restrictions=None, handler=None)
----------------------------------------------------------------------------------
``actor_id`` - string
The ID of the actor to create a token for.
@ -682,16 +682,13 @@ This example checks if the user can access a specific table, and sets ``private`
``expires_after`` - int, optional
The number of seconds after which the token should expire.
``restrict_all`` - iterable, optional
A list of actions that this token should be restricted to across all databases and resources.
``restrictions`` - :ref:`TokenRestrictions <TokenRestrictions>`, optional
A :ref:`TokenRestrictions <TokenRestrictions>` object limiting which actions the token can perform.
``restrict_database`` - dict, optional
For restricting actions within specific databases, e.g. ``{"mydb": ["view-table", "view-query"]}``.
``handler`` - string, optional
The name of a specific token handler to use. If omitted, the first registered handler is used. See :ref:`plugin_hook_register_token_handler`.
``restrict_resource`` - dict, optional
For restricting actions to specific resources (tables, SQL views and :ref:`canned_queries`) within a database. For example: ``{"mydb": {"mytable": ["insert-row", "update-row"]}}``.
This method returns a signed :ref:`API token <CreateTokenView>` of the format ``dstok_...`` which can be used to authenticate requests to the Datasette API.
This is an ``async`` method that returns an :ref:`API token <CreateTokenView>` string which can be used to authenticate requests to the Datasette API. The default ``SignedTokenHandler`` returns tokens of the format ``dstok_...``.
All tokens must have an ``actor_id`` string indicating the ID of the actor which the token will act on behalf of.
@ -699,28 +696,72 @@ Tokens default to lasting forever, but can be set to expire after a given number
.. code-block:: python
token = datasette.create_token(
token = await datasette.create_token(
actor_id="user1",
expires_after=3600,
)
The three ``restrict_*`` arguments can be used to create a token that has additional restrictions beyond what the associated actor is allowed to do.
.. _TokenRestrictions:
TokenRestrictions
~~~~~~~~~~~~~~~~~
The ``TokenRestrictions`` class uses a builder pattern to specify which actions a token is allowed to perform. Import it from ``datasette.tokens``:
.. code-block:: python
from datasette.tokens import TokenRestrictions
restrictions = (
TokenRestrictions()
.allow_all("view-instance")
.allow_all("view-table")
.allow_database("docs", "view-query")
.allow_resource("docs", "attachments", "insert-row")
.allow_resource("docs", "attachments", "update-row")
)
The builder methods are:
- ``allow_all(action)`` - allow an action across all databases and resources
- ``allow_database(database, action)`` - allow an action on a specific database
- ``allow_resource(database, resource, action)`` - allow an action on a specific resource (table, SQL view or :ref:`canned query <canned_queries>`) within a database
Each method returns the ``TokenRestrictions`` instance so calls can be chained.
The following example creates a token that can access ``view-instance`` and ``view-table`` across everything, can additionally use ``view-query`` for anything in the ``docs`` database and is allowed to execute ``insert-row`` and ``update-row`` in the ``attachments`` table in that database:
.. code-block:: python
token = datasette.create_token(
token = await datasette.create_token(
actor_id="user1",
restrict_all=("view-instance", "view-table"),
restrict_database={"docs": ("view-query",)},
restrict_resource={
"docs": {
"attachments": ("insert-row", "update-row")
}
},
restrictions=(
TokenRestrictions()
.allow_all("view-instance")
.allow_all("view-table")
.allow_database("docs", "view-query")
.allow_resource("docs", "attachments", "insert-row")
.allow_resource("docs", "attachments", "update-row")
),
)
.. _datasette_verify_token:
await .verify_token(token)
--------------------------
``token`` - string
The token string to verify.
This is an ``async`` method that verifies an API token by trying each registered token handler in order. Returns an actor dictionary from the first handler that recognizes the token, or ``None`` if no handler accepts it.
.. code-block:: python
actor = await datasette.verify_token(token)
if actor:
# Token was valid
print(actor["id"])
.. _datasette_get_database:
.get_database(name)

View file

@ -2334,3 +2334,62 @@ The plugin can then call ``datasette.track_event(...)`` to send a ``ban-user`` e
BanUserEvent(user={"id": 1, "username": "cleverbot"})
)
.. _plugin_hook_register_token_handler:
register_token_handler(datasette)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
``datasette`` - :ref:`internals_datasette`
You can use this to access plugin configuration options via ``datasette.plugin_config(your_plugin_name)``.
Return a ``TokenHandler`` instance to provide a custom token creation and verification backend. This hook can return a single ``TokenHandler`` or a list of them.
The default ``SignedTokenHandler`` uses itsdangerous signed tokens (``dstok_`` prefix). Plugins can provide alternative backends such as database-backed tokens that support revocation and auditing.
.. code-block:: python
from datasette import hookimpl, TokenHandler
class DatabaseTokenHandler(TokenHandler):
name = "database"
async def create_token(
self,
datasette,
actor_id,
*,
expires_after=None,
restrictions=None
):
# Store token in database and return token string
...
async def verify_token(self, datasette, token):
# Look up token in database, return actor dict or None
...
@hookimpl
def register_token_handler(datasette):
return DatabaseTokenHandler()
The ``create_token`` method receives a ``restrictions`` argument which will be a :ref:`TokenRestrictions <TokenRestrictions>` instance or ``None``.
Tokens can then be created and verified using :ref:`datasette.create_token() <datasette_create_token>` and ``datasette.verify_token()``, which delegate to the registered handlers. If no ``handler`` is specified, the first handler is used according to `pluggy call-time ordering <https://pluggy.readthedocs.io/en/stable/#call-time-order>`_. Use the ``handler`` parameter to select a specific backend by name:
.. code-block:: python
# Uses first registered handler (default)
token = await datasette.create_token("user123")
# Uses a specific handler by name
token = await datasette.create_token(
"user123", handler="database"
)
# Verification tries all handlers
actor = await datasette.verify_token(token)
If no handlers are registered, ``create_token()`` raises ``RuntimeError``. If the requested ``handler`` name is not found, it raises ``ValueError``.

View file

@ -231,12 +231,21 @@ If you run ``datasette plugins --all`` it will include default plugins that ship
"templates": false,
"version": null,
"hooks": [
"actor_from_request",
"canned_queries",
"permission_resources_sql",
"skip_csrf"
]
},
{
"name": "datasette.default_permissions.tokens",
"static": false,
"templates": false,
"version": null,
"hooks": [
"actor_from_request",
"register_token_handler"
]
},
{
"name": "datasette.events",
"static": false,

View file

@ -114,3 +114,43 @@ Instead, one should use the following methods on a Datasette class:
```{include} upgrade-1.0a20.md
:heading-offset: 1
```
(upgrade_guide_v1_a25)=
### Datasette 1.0a25: `create_token()` signature change
`datasette.create_token()` is now an `async` method (previously it was synchronous). The `restrict_all`, `restrict_database`, and `restrict_resource` keyword arguments have been replaced by a single `restrictions` parameter that accepts a {ref}`TokenRestrictions <TokenRestrictions>` object.
Old code:
```python
token = datasette.create_token(
actor_id="user1",
restrict_all=["view-instance", "view-table"],
restrict_database={"docs": ["view-query"]},
restrict_resource={
"docs": {
"attachments": ["insert-row", "update-row"]
}
},
)
```
New code:
```python
from datasette.tokens import TokenRestrictions
token = await datasette.create_token(
actor_id="user1",
restrictions=(
TokenRestrictions()
.allow_all("view-instance")
.allow_all("view-table")
.allow_database("docs", "view-query")
.allow_resource("docs", "attachments", "insert-row")
.allow_resource("docs", "attachments", "update-row")
),
)
```
The `datasette create-token` CLI command is unchanged.

View file

@ -51,6 +51,7 @@ EXPECTED_PLUGINS = [
"register_facet_classes",
"register_magic_parameters",
"register_routes",
"register_token_handler",
"render_cell",
"row_actions",
"skip_csrf",

View file

@ -1,6 +1,7 @@
import asyncio
from datasette import hookimpl
from datasette.facets import Facet
from datasette.tokens import TokenHandler
from datasette import tracer
from datasette.permissions import Action
from datasette.resources import DatabaseResource
@ -586,3 +587,29 @@ def permission_resources_sql(datasette, actor, action):
return PermissionSQL.allow(reason=f"todomvc actor allowed for {action}")
return None
class HardcodedTokenHandler(TokenHandler):
name = "hardcoded"
_counter = 0
async def create_token(
self,
datasette,
actor_id,
*,
expires_after=None,
restrictions=None,
):
HardcodedTokenHandler._counter += 1
return f"dstok_hardcoded_token_{HardcodedTokenHandler._counter}"
async def verify_token(self, datasette, token):
if token.startswith("dstok_hardcoded_token_"):
return {"id": "hardcoded-actor", "token": "hardcoded"}
return None
@hookimpl
def register_token_handler(datasette):
return HardcodedTokenHandler()

View file

@ -1362,7 +1362,14 @@ async def test_create_table(
async def test_create_table_permissions(
ds_write, permissions, body, expected_status, expected_errors
):
token = ds_write.create_token("root", restrict_all=["view-instance"] + permissions)
from datasette.tokens import TokenRestrictions
restrictions = TokenRestrictions()
for action in ["view-instance"] + permissions:
restrictions.allow_all(action)
token = await ds_write.create_token(
"root", handler="signed", restrictions=restrictions
)
response = await ds_write.client.post(
"/data/-/create",
json=body,

View file

@ -1657,7 +1657,7 @@ async def test_permission_check_view_requires_debug_permission():
# Root user should have access (root has all permissions)
ds_with_root = Datasette()
ds_with_root.root_enabled = True
root_token = ds_with_root.create_token("root")
root_token = await ds_with_root.create_token("root", handler="signed")
response = await ds_with_root.client.get(
"/-/check.json?action=view-instance",
headers={"Authorization": f"Bearer {root_token}"},

View file

@ -1566,6 +1566,38 @@ async def test_hook_register_events():
assert any(k.__name__ == "OneEvent" for k in datasette.event_classes)
@pytest.mark.asyncio
async def test_hook_register_token_handler(ds_client):
handlers = ds_client.ds._token_handlers()
handler_names = [h.name for h in handlers]
# Both the default signed handler and the test hardcoded handler
assert "signed" in handler_names
assert "hardcoded" in handler_names
# Create a token using the hardcoded handler (first registered from plugins dir)
token = await ds_client.ds.create_token("test-user")
assert token.startswith("dstok_hardcoded_token_")
# Verify it
actor = await ds_client.ds.verify_token(token)
assert actor["id"] == "hardcoded-actor"
assert actor["token"] == "hardcoded"
# Create a token by explicitly requesting the hardcoded handler by name
token2 = await ds_client.ds.create_token("test-user", handler="hardcoded")
assert token2.startswith("dstok_hardcoded_token_")
actor2 = await ds_client.ds.verify_token(token2)
assert actor2["id"] == "hardcoded-actor"
# Create a token by explicitly requesting the signed handler by name
signed_token = await ds_client.ds.create_token("test-user", handler="signed")
assert signed_token.startswith("dstok_")
assert not signed_token.startswith("dstok_hardcoded_token_")
signed_actor = await ds_client.ds.verify_token(signed_token)
assert signed_actor["id"] == "test-user"
assert signed_actor["token"] == "dstok"
@pytest.mark.asyncio
async def test_hook_write_wrapper():
datasette = Datasette(memory=True)

301
tests/test_token_handler.py Normal file
View file

@ -0,0 +1,301 @@
"""
Tests for the register_token_handler plugin hook.
"""
from datasette.app import Datasette
from datasette.hookspecs import hookimpl
from datasette.plugins import pm
from datasette.tokens import TokenHandler, TokenRestrictions, SignedTokenHandler
import pytest
@pytest.fixture
def datasette():
return Datasette()
@pytest.mark.asyncio
async def test_default_signed_handler_registered(datasette):
"""The default SignedTokenHandler should be registered automatically."""
handlers = datasette._token_handlers()
assert len(handlers) >= 1
assert any(isinstance(h, SignedTokenHandler) for h in handlers)
assert any(h.name == "signed" for h in handlers)
@pytest.mark.asyncio
async def test_create_token_default(datasette):
"""create_token() with handler='signed' should create a signed token."""
token = await datasette.create_token("test_actor", handler="signed")
assert token.startswith("dstok_")
@pytest.mark.asyncio
async def test_create_token_with_restrictions(datasette):
"""create_token() should handle restriction parameters."""
token = await datasette.create_token(
"test_actor",
handler="signed",
expires_after=3600,
restrictions=TokenRestrictions().allow_all("view-instance"),
)
assert token.startswith("dstok_")
# Verify the token contains the expected data
decoded = datasette.unsign(token[len("dstok_") :], namespace="token")
assert decoded["a"] == "test_actor"
assert decoded["d"] == 3600
assert "_r" in decoded
assert "a" in decoded["_r"]
@pytest.mark.asyncio
async def test_verify_token_default(datasette):
"""verify_token() should verify signed tokens."""
token = await datasette.create_token("test_actor", handler="signed")
actor = await datasette.verify_token(token)
assert actor is not None
assert actor["id"] == "test_actor"
assert actor["token"] == "dstok"
@pytest.mark.asyncio
async def test_verify_token_unknown_returns_none(datasette):
"""verify_token() should return None for unrecognized tokens."""
result = await datasette.verify_token("unknown_token_format_xyz")
assert result is None
@pytest.mark.asyncio
async def test_verify_token_bad_signature_returns_none(datasette):
"""verify_token() should return None for tokens with bad signatures."""
result = await datasette.verify_token("dstok_tampered_data_here")
assert result is None
@pytest.mark.asyncio
async def test_create_token_with_named_handler(datasette):
"""create_token(handler='signed') should select the signed handler."""
token = await datasette.create_token("test_actor", handler="signed")
assert token.startswith("dstok_")
@pytest.mark.asyncio
async def test_create_token_unknown_handler_raises(datasette):
"""create_token(handler='nonexistent') should raise ValueError."""
with pytest.raises(ValueError, match="Token handler 'nonexistent' not found"):
await datasette.create_token("test_actor", handler="nonexistent")
@pytest.mark.asyncio
async def test_custom_token_handler(datasette):
"""A custom token handler should be usable for both create and verify."""
class CustomHandler(TokenHandler):
name = "custom"
async def create_token(self, datasette, actor_id, **kwargs):
return f"custom_{actor_id}"
async def verify_token(self, datasette, token):
if token.startswith("custom_"):
return {"id": token[len("custom_") :], "token": "custom"}
return None
class Plugin:
__name__ = "CustomTokenPlugin"
@staticmethod
@hookimpl
def register_token_handler(datasette):
return CustomHandler()
pm.register(Plugin(), name="test_custom_handler")
try:
handlers = datasette._token_handlers()
assert any(h.name == "custom" for h in handlers)
# Create with custom handler
token = await datasette.create_token("alice", handler="custom")
assert token == "custom_alice"
# Verify custom token
actor = await datasette.verify_token("custom_alice")
assert actor is not None
assert actor["id"] == "alice"
assert actor["token"] == "custom"
# Signed tokens should still work
signed_token = await datasette.create_token("bob", handler="signed")
assert signed_token.startswith("dstok_")
actor = await datasette.verify_token(signed_token)
assert actor["id"] == "bob"
finally:
pm.unregister(name="test_custom_handler")
@pytest.mark.asyncio
async def test_verify_token_tries_all_handlers(datasette):
"""verify_token() should try each handler until one matches."""
class HandlerA(TokenHandler):
name = "handler_a"
async def create_token(self, datasette, actor_id, **kwargs):
return f"a_{actor_id}"
async def verify_token(self, datasette, token):
if token.startswith("a_"):
return {"id": token[2:], "token": "handler_a"}
return None
class HandlerB(TokenHandler):
name = "handler_b"
async def create_token(self, datasette, actor_id, **kwargs):
return f"b_{actor_id}"
async def verify_token(self, datasette, token):
if token.startswith("b_"):
return {"id": token[2:], "token": "handler_b"}
return None
class PluginA:
__name__ = "PluginA"
@staticmethod
@hookimpl
def register_token_handler(datasette):
return HandlerA()
class PluginB:
__name__ = "PluginB"
@staticmethod
@hookimpl
def register_token_handler(datasette):
return HandlerB()
pm.register(PluginA(), name="test_handler_a")
pm.register(PluginB(), name="test_handler_b")
try:
# Both handler tokens should verify
actor_a = await datasette.verify_token("a_alice")
assert actor_a is not None
assert actor_a["id"] == "alice"
assert actor_a["token"] == "handler_a"
actor_b = await datasette.verify_token("b_bob")
assert actor_b is not None
assert actor_b["id"] == "bob"
assert actor_b["token"] == "handler_b"
# Unknown token should return None
assert await datasette.verify_token("c_charlie") is None
finally:
pm.unregister(name="test_handler_a")
pm.unregister(name="test_handler_b")
@pytest.mark.asyncio
async def test_token_handler_via_http(datasette):
"""Default signed tokens should work through HTTP auth."""
token = await datasette.create_token("http_user", handler="signed")
response = await datasette.client.get(
"/-/actor.json",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200
actor = response.json()["actor"]
assert actor["id"] == "http_user"
assert actor["token"] == "dstok"
@pytest.mark.asyncio
async def test_custom_handler_via_http(datasette):
"""Custom handler tokens should work through HTTP auth."""
class CustomHandler(TokenHandler):
name = "custom_http"
async def create_token(self, datasette, actor_id, **kwargs):
return f"chttp_{actor_id}"
async def verify_token(self, datasette, token):
if token.startswith("chttp_"):
return {"id": token[len("chttp_") :], "token": "custom_http"}
return None
class Plugin:
__name__ = "CustomHTTPPlugin"
@staticmethod
@hookimpl
def register_token_handler(datasette):
return CustomHandler()
pm.register(Plugin(), name="test_custom_http")
try:
token = await datasette.create_token("web_user", handler="custom_http")
response = await datasette.client.get(
"/-/actor.json",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200
actor = response.json()["actor"]
assert actor["id"] == "web_user"
assert actor["token"] == "custom_http"
finally:
pm.unregister(name="test_custom_http")
@pytest.mark.asyncio
async def test_token_handler_base_class_raises():
"""TokenHandler base class methods should raise NotImplementedError."""
handler = TokenHandler()
ds = Datasette()
with pytest.raises(NotImplementedError):
await handler.create_token(ds, "test")
with pytest.raises(NotImplementedError):
await handler.verify_token(ds, "test")
@pytest.mark.asyncio
async def test_restrictions_round_trip(datasette):
"""Tokens with database/resource restrictions should round-trip correctly."""
restrictions = (
TokenRestrictions()
.allow_all("view-instance")
.allow_database("docs", "view-query")
.allow_resource("docs", "attachments", "insert-row")
)
token = await datasette.create_token(
"test_actor", handler="signed", restrictions=restrictions
)
actor = await datasette.verify_token(token)
assert actor is not None
assert actor["id"] == "test_actor"
assert actor["_r"]["a"] == ["view-instance"]
assert actor["_r"]["d"] == {"docs": ["view-query"]}
assert actor["_r"]["r"] == {"docs": {"attachments": ["insert-row"]}}
@pytest.mark.asyncio
async def test_expires_after_round_trip(datasette):
"""Tokens with expires_after should include token_expires in the actor."""
token = await datasette.create_token(
"test_actor", handler="signed", expires_after=3600
)
actor = await datasette.verify_token(token)
assert actor is not None
assert actor["id"] == "test_actor"
assert "token_expires" in actor
@pytest.mark.asyncio
async def test_signed_tokens_disabled():
"""create_token and verify_token should fail/skip when signed tokens are disabled."""
ds = Datasette(settings={"allow_signed_tokens": False})
with pytest.raises(ValueError, match="Signed tokens are not enabled"):
await ds.create_token("test_actor", handler="signed")
# verify_token should return None rather than raising
assert await ds.verify_token("dstok_anything") is None