Replace token-based CSRF with Sec-Fetch-Site header protection (#2689)

- New CSRF protection middleware inspired by Go 1.25 and research by Filippo Valsorda - https://words.filippo.io/csrf/ - this replaces the old CSRF token based protection.
- Removes all instances of `<input type="hidden" name="csrftoken" value="{{ csrftoken() }}">` in the templates - they are no longer needed.
- Removes the `def skip_csrf(datasette, scope):` plugin hook defined in `datasette/hookspecs.py` and its documentation and tests.
- Updated CSRF protection documentation to describe the new approach.
- Upgrade guide now describes the CSRF change.
This commit is contained in:
Simon Willison 2026-04-14 17:11:36 -07:00 committed by GitHub
commit 0b639a8122
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 506 additions and 163 deletions

View file

@ -1,6 +1,5 @@
from __future__ import annotations
from asgi_csrf import Errors
import asyncio
import contextvars
from typing import TYPE_CHECKING, Any, Dict, Iterable, List
@ -8,7 +7,6 @@ from typing import TYPE_CHECKING, Any, Dict, Iterable, List
if TYPE_CHECKING:
from datasette.permissions import Resource
from datasette.tokens import TokenRestrictions
import asgi_csrf
import collections
import dataclasses
import datetime
@ -120,6 +118,7 @@ from .utils.asgi import (
asgi_send_file,
asgi_send_redirect,
)
from .csrf import CrossOriginProtectionMiddleware
from .utils.internal_db import init_internal_db, populate_schema_tables
from .utils.sqlite import (
sqlite3,
@ -2003,7 +2002,11 @@ class Datasette:
"extra_js_urls", template, context, request, view_name
),
"base_url": self.setting("base_url"),
"csrftoken": request.scope["csrftoken"] if request else lambda: "",
"csrftoken": (
request.scope["csrftoken"]
if request and "csrftoken" in request.scope
else lambda: ""
),
"datasette_version": __version__,
},
**extra_template_vars,
@ -2306,26 +2309,7 @@ class Datasette:
if not database.is_mutable:
await database.table_counts(limit=60 * 60 * 1000)
async def custom_csrf_error(scope, send, message_id):
await asgi_send(
send,
content=await self.render_template(
"csrf_error.html",
{"message_id": message_id, "message_name": Errors(message_id).name},
),
status=403,
content_type="text/html; charset=utf-8",
)
asgi = asgi_csrf.asgi_csrf(
DatasetteRouter(self, routes),
signing_secret=self._secret,
cookie_name="ds_csrftoken",
skip_if_scope=lambda scope: any(
pm.hook.skip_csrf(datasette=self, scope=scope)
),
send_csrf_failed=custom_csrf_error,
)
asgi = CrossOriginProtectionMiddleware(DatasetteRouter(self, routes), self)
if self.setting("trace_debug"):
asgi = AsgiTracer(asgi)
asgi = AsgiLifespan(asgi)

126
datasette/csrf.py Normal file
View file

@ -0,0 +1,126 @@
"""
Header-based CSRF (Cross-Origin) protection.
Datasette uses the Sec-Fetch-Site + Origin header approach described in
Filippo Valsorda's article (https://words.filippo.io/csrf/) and implemented
in Go 1.25's http.CrossOriginProtection. This replaces the previous
token-based asgi-csrf mechanism.
"""
from __future__ import annotations
import secrets
import urllib.parse
from .utils.asgi import asgi_send
SAFE_METHODS = frozenset({"GET", "HEAD", "OPTIONS"})
def _install_legacy_csrftoken(scope):
"""
Populate ``scope["csrftoken"]`` with a callable returning a per-request
random token. Provided for plugin compatibility only - core no longer
uses this value for CSRF enforcement.
"""
def csrftoken():
if "_datasette_legacy_csrftoken" not in scope:
scope["_datasette_legacy_csrftoken"] = secrets.token_urlsafe(32)
return scope["_datasette_legacy_csrftoken"]
scope["csrftoken"] = csrftoken
class CrossOriginProtectionMiddleware:
"""
Modern CSRF protection using the Sec-Fetch-Site and Origin headers.
Based on Filippo Valsorda's algorithm, as implemented in Go 1.25's
http.CrossOriginProtection. See https://words.filippo.io/csrf/
Unsafe-method requests are allowed through only if they look same-origin.
Non-browser clients (curl, etc.) send neither Sec-Fetch-Site nor Origin
and are passed through unchanged - CSRF is a browser-only attack.
"""
SAFE_METHODS = SAFE_METHODS
def __init__(self, app, datasette):
self.app = app
self.datasette = datasette
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
_install_legacy_csrftoken(scope)
if scope.get("method", "GET") in self.SAFE_METHODS:
await self.app(scope, receive, send)
return
headers = dict(scope.get("headers") or [])
# Bearer-token requests are not ambient browser credentials, so they
# are not CSRF-vulnerable. Narrowly exempt them from the header check
# before evaluating Sec-Fetch-Site / Origin. Only "Bearer" is exempt;
# schemes like Basic or Digest can be browser-managed and ambient.
authorization = headers.get(b"authorization", b"").decode("latin-1")
if authorization:
scheme = authorization.split(None, 1)[0].lower()
if scheme == "bearer":
await self.app(scope, receive, send)
return
origin_bytes = headers.get(b"origin")
sec_fetch_site_bytes = headers.get(b"sec-fetch-site")
host_bytes = headers.get(b"host", b"")
origin = origin_bytes.decode("latin-1") if origin_bytes else None
sec_fetch_site = (
sec_fetch_site_bytes.decode("latin-1") if sec_fetch_site_bytes else None
)
host = host_bytes.decode("latin-1")
# Primary defense: Sec-Fetch-Site (set by browsers, unforgeable from JS)
if sec_fetch_site is not None:
if sec_fetch_site in ("same-origin", "none"):
await self.app(scope, receive, send)
return
await self._forbid(
send,
"Sec-Fetch-Site was {!r}, expected 'same-origin' or 'none'".format(
sec_fetch_site
),
)
return
# No Sec-Fetch-Site and no Origin -> non-browser client (curl, API, etc.)
if origin is None:
await self.app(scope, receive, send)
return
# Fallback for older browsers: Origin host must match Host header
parsed = urllib.parse.urlparse(origin)
origin_host = parsed.hostname or ""
if parsed.port:
origin_host = "{}:{}".format(origin_host, parsed.port)
if origin_host == host:
await self.app(scope, receive, send)
return
await self._forbid(
send,
"Origin {!r} does not match Host {!r}".format(origin, host),
)
async def _forbid(self, send, reason):
await asgi_send(
send,
content=await self.datasette.render_template(
"csrf_error.html", {"reason": reason}
),
status=403,
content_type="text/html; charset=utf-8",
)

View file

@ -17,7 +17,7 @@ UNION/INTERSECT operations. The order of evaluation is:
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from datasette.app import Datasette
@ -39,16 +39,6 @@ from .defaults import (
)
@hookimpl
def skip_csrf(scope) -> Optional[bool]:
"""Skip CSRF check for JSON content-type requests."""
if scope["type"] == "http":
headers = scope.get("headers") or {}
if dict(headers).get(b"content-type") == b"application/json":
return True
return None
@hookimpl
def canned_queries(datasette: "Datasette", database: str, actor) -> dict:
"""Return canned queries defined in datasette.yaml configuration."""

View file

@ -187,11 +187,6 @@ def homepage_actions(datasette, actor, request):
"""Links for the homepage actions menu"""
@hookspec
def skip_csrf(datasette, scope):
"""Mechanism for skipping CSRF checks for certain requests"""
@hookspec
def handle_exception(datasette, request, exception):
"""Handle an uncaught exception. Can return a Response or None."""

View file

@ -38,7 +38,6 @@
{% endif %}
{% if show_logout %}
<form class="nav-menu-logout" action="{{ urls.logout() }}" method="post">
<input type="hidden" name="csrftoken" value="{{ csrftoken() }}">
<button class="button-as-link">Log out</button>
</form>{% endif %}
</div>

View file

@ -50,7 +50,6 @@
</select>
</div>
<input type="text" name="expire_duration" style="width: 10%">
<input type="hidden" name="csrftoken" value="{{ csrftoken() }}">
<input type="submit" value="Create token">
<details style="margin-top: 1em" id="restrict-permissions">

View file

@ -1,5 +1,5 @@
{% extends "base.html" %}
{% block title %}CSRF check failed){% endblock %}
{% block title %}CSRF check failed{% endblock %}
{% block content %}
<h1>Form origin check failed</h1>
@ -7,7 +7,7 @@
<details><summary>Technical details</summary>
<p>Developers: consult Datasette's <a href="https://docs.datasette.io/en/latest/internals.html#csrf-protection">CSRF protection documentation</a>.</p>
<p>Error code is {{ message_name }}.</p>
<p>Reason: {{ reason }}</p>
</details>
{% endblock %}

View file

@ -52,7 +52,6 @@ textarea {
<div class="permission-form">
<form action="{{ urls.path('-/permissions') }}" id="debug-post" method="post">
<input type="hidden" name="csrftoken" value="{{ csrftoken() }}">
<div class="two-col">
<div class="form-section">
<label>Actor</label>

View file

@ -10,7 +10,6 @@
<form class="core" action="{{ urls.logout() }}" method="post">
<div>
<input type="hidden" name="csrftoken" value="{{ csrftoken() }}">
<input type="submit" value="Log out">
</div>
</form>

View file

@ -19,7 +19,6 @@
<option>all</option>
</select>
</div>
<input type="hidden" name="csrftoken" value="{{ csrftoken() }}">
<input type="submit" value="Add message">
</div>
</form>

View file

@ -65,7 +65,6 @@
{% endif %}
<p>
{% if not hide_sql %}<button id="sql-format" type="button" hidden>Format SQL</button>{% endif %}
{% if canned_query_write %}<input type="hidden" name="csrftoken" value="{{ csrftoken() }}">{% endif %}
<input type="submit" value="Run SQL"{% if canned_query_write and db_is_immutable %} disabled{% endif %}>
{{ show_hide_hidden }}
{% if canned_query and edit_sql_url %}<a href="{{ edit_sql_url }}" class="canned-query-edit-sql">Edit SQL</a>{% endif %}

View file

@ -95,15 +95,8 @@ class TestClient:
cookies = cookies or {}
post_data = post_data or {}
assert not (post_data and body), "Provide one or other of body= or post_data="
# Maybe fetch a csrftoken first
if csrftoken_from is not None:
assert body is None, "body= is not compatible with csrftoken_from="
if csrftoken_from is True:
csrftoken_from = path
token_response = await self._request(csrftoken_from, cookies=cookies)
csrftoken = token_response.cookies["ds_csrftoken"]
cookies["ds_csrftoken"] = csrftoken
post_data["csrftoken"] = csrftoken
# csrftoken_from is accepted for backward compatibility but is now a no-op.
# Datasette no longer uses CSRF tokens - see CrossOriginProtectionMiddleware.
if post_data:
body = urlencode(post_data, doseq=True)
return await self._request(

View file

@ -1941,19 +1941,16 @@ The ``Database`` class also provides properties and methods for introspecting th
CSRF protection
===============
Datasette uses `asgi-csrf <https://github.com/simonw/asgi-csrf>`__ to guard against CSRF attacks on form POST submissions. Users receive a ``ds_csrftoken`` cookie which is compared against the ``csrftoken`` form field (or ``x-csrftoken`` HTTP header) for every incoming request.
Datasette protects against Cross-Site Request Forgery by inspecting the browser-set ``Sec-Fetch-Site`` and ``Origin`` headers on every unsafe (non-``GET``/``HEAD``/``OPTIONS``) request, following the approach described in `Filippo Valsorda's article <https://words.filippo.io/csrf/>`__ and implemented in Go 1.25's ``http.CrossOriginProtection``.
If your plugin implements a ``<form method="POST">`` anywhere you will need to include that token. You can do so with the following template snippet:
A request is rejected with a ``403`` response if:
.. code-block:: html
- It carries ``Sec-Fetch-Site`` with any value other than ``same-origin`` or ``none``, or
- It has no ``Sec-Fetch-Site`` header but does carry an ``Origin`` header whose host does not match the request ``Host``.
<input type="hidden" name="csrftoken" value="{{ csrftoken() }}">
Requests from non-browser clients (``curl``, server-to-server scripts, etc.) do not send ``Sec-Fetch-Site`` or ``Origin`` and pass through unchanged - CSRF is a browser-only attack.
If you are rendering templates using the :ref:`datasette_render_template` method the ``csrftoken()`` helper will only work if you provide the ``request=`` argument to that method. If you forget to do this you will see the following error::
form-urlencoded POST field did not match cookie
You can selectively disable CSRF protection using the :ref:`plugin_hook_skip_csrf` hook.
No token, cookie, or hidden form field is needed. Any ``<form method="POST">`` inside Datasette or a plugin will be accepted from the same origin without modification.
.. _internals_internal:

View file

@ -1837,31 +1837,6 @@ This example logs an error to `Sentry <https://sentry.io/>`__ and then renders a
Example: `datasette-sentry <https://datasette.io/plugins/datasette-sentry>`_
.. _plugin_hook_skip_csrf:
skip_csrf(datasette, scope)
---------------------------
``datasette`` - :ref:`internals_datasette`
You can use this to access plugin configuration options via ``datasette.plugin_config(your_plugin_name)``, or to execute SQL queries.
``scope`` - dictionary
The `ASGI scope <https://asgi.readthedocs.io/en/latest/specs/www.html#http-connection-scope>`__ for the incoming HTTP request.
This hook can be used to skip :ref:`internals_csrf` for a specific incoming request. For example, you might have a custom path at ``/submit-comment`` which is designed to accept comments from anywhere, whether or not the incoming request originated on the site and has an accompanying CSRF token.
This example will disable CSRF protection for that specific URL path:
.. code-block:: python
from datasette import hookimpl
@hookimpl
def skip_csrf(scope):
return scope["path"] == "/submit-comment"
If any of the currently active ``skip_csrf()`` plugin hooks return ``True``, CSRF protection will be skipped for the request.
.. _plugin_hook_menu_links:

View file

@ -241,8 +241,7 @@ If you run ``datasette plugins --all`` it will include default plugins that ship
"version": null,
"hooks": [
"canned_queries",
"permission_resources_sql",
"skip_csrf"
"permission_resources_sql"
]
},
{

View file

@ -235,9 +235,8 @@ As an example, here's a very simple plugin which executes an HTTP response and r
if request.method == "GET":
return Response.html("""
<form action="/-/fetch-url" method="post">
<input type="hidden" name="csrftoken" value="{}">
<input name="url"><input type="submit">
</form>""".format(request.scope["csrftoken"]()))
</form>""")
vars = await request.post_vars()
url = vars["url"]
return Response.text(httpx.get(url).text)

View file

@ -155,3 +155,63 @@ token = await datasette.create_token(
```
The `datasette create-token` CLI command is unchanged.
(upgrade_guide_csrf)=
### CSRF protection is now header-based
Datasette's Cross-Site Request Forgery protection no longer uses tokens. The previous `asgi-csrf` mechanism - which set a `ds_csrftoken` cookie and required a matching `<input type="hidden" name="csrftoken">` in every form - has been replaced with an ASGI middleware that inspects the browser-set `Sec-Fetch-Site` and `Origin` headers, following the approach described in [Filippo Valsorda's research](https://words.filippo.io/csrf/) and implemented in Go 1.25's `http.CrossOriginProtection`.
This works identically on HTTPS, HTTP, and localhost. Non-browser clients (curl, Python `requests`, server-to-server scripts) do not send `Sec-Fetch-Site` or `Origin` and are passed through unchanged - CSRF is a browser-only attack.
Requests that carry an explicit `Authorization: Bearer ...` header are also exempt from the CSRF check, because bearer tokens are not ambient browser credentials: a malicious cross-origin page cannot cause the browser to attach a target site's bearer token unless the attacker's JavaScript already possesses it. This exemption is narrow - it covers the `Bearer` scheme only, not `Basic` or `Digest` - and it does not depend on the `--cors` setting. The exemption is about CSRF classification, not browser read access; CORS still controls the latter.
#### What you can remove
You can now delete any of the following from your plugins and custom templates:
- Hidden CSRF form fields:
```html
<input type="hidden" name="csrftoken" value="{{ csrftoken() }}">
```
The `csrftoken()` template helper (and `request.scope["csrftoken"]()` for plugins that call it from Python) still exists as a compatibility shim. It now returns a per-request random string rather than a cookie-bound signed value. Datasette no longer validates this token, and no `ds_csrftoken` cookie is set.
**Important for plugin authors:** if your plugin previously used `request.scope["csrftoken"]()` or the `ds_csrftoken` cookie as a security primitive (for example, signing a URL and later comparing it to the cookie), the invariant that the token equals `request.cookies["ds_csrftoken"]` no longer holds. Replace those flows with signed, short-lived action URLs or explicit non-ambient credentials.
- Manual CSRF token extraction in tests, e.g.:
```python
# No longer needed
csrftoken = response.cookies["ds_csrftoken"]
cookies["ds_csrftoken"] = csrftoken
post_data["csrftoken"] = csrftoken
```
The `ds_csrftoken` cookie is no longer set at all. The `csrftoken_from=` argument of the Datasette test client's `.post()` method is now a no-op and can be removed from your test code.
#### Breaking changes
- **The `skip_csrf` plugin hook has been removed.** Existing plugins that still declare a `skip_csrf` hookimpl will continue to load - pluggy silently ignores unknown hook names - but the hook is no longer consulted by core, so the flows it previously unlocked will now be blocked (or allowed) purely on the basis of the new header check.
The new middleware already covers the common cases that `skip_csrf` was written for:
- Browser-initiated JSON POSTs automatically get `Sec-Fetch-Site: same-origin` and pass the check.
- Non-browser API clients (curl, `requests`, server-to-server scripts) do not send browser security headers and are passed through.
- Requests with an explicit `Authorization: Bearer ...` header are exempt from the CSRF check (see above).
If your plugin previously used `skip_csrf` to accept cross-origin browser POSTs, replace that flow with an authentication mechanism that does **not** rely on ambient browser credentials. Safe patterns include:
- Requiring an `Authorization: Bearer ...` API token on the endpoint.
- Requiring a non-ambient credential in the request body (a webhook secret, HMAC signature, signed capability URL, OAuth client credential, or similar).
- Issuing a short-lived signed URL that encodes the actor, the action, and an expiry, and verifying the signature on request.
Do not rely on the `ds_csrftoken` cookie for your own plugin's security checks - Datasette no longer sets or validates it, and the `request.scope["csrftoken"]()` compatibility shim now returns a fresh random value each request rather than the signed cookie-bound value it used to.
- **The `asgi-csrf` dependency has been dropped.** Any plugin that imported from `asgi_csrf` directly will need to be updated.
- **The `csrf_error.html` template now receives a `reason` context variable** instead of `message_id` and `message_name`. Custom overrides of this template should be updated.
#### Security properties
For defense-in-depth the `ds_actor` and `ds_messages` cookies continue to be set with `SameSite=Lax` (Datasette's long-standing default). This means a genuine cross-site POST from an attacker's page would arrive without the user's authentication cookie even if the header check somehow failed.

View file

@ -33,7 +33,6 @@ dependencies = [
"uvicorn>=0.11",
"aiofiles>=0.4",
"janus>=0.6.2",
"asgi-csrf>=0.10",
"PyYAML>=5.3",
"mergedeep>=1.1.1",
"itsdangerous>=1.1",

View file

@ -42,6 +42,17 @@ def wait_until_responds(url, timeout=5.0, client=httpx, **kwargs):
raise AssertionError("Timed out waiting for {} to respond".format(url))
@pytest.fixture
def bare_ds():
"""
Minimal Datasette with no plugins, data, metadata, or config - for tests
that want to exercise core behavior (e.g. middleware) in isolation.
"""
from datasette.app import Datasette
return Datasette(memory=True)
@pytest_asyncio.fixture
async def ds_client():
from datasette.app import Datasette

View file

@ -54,7 +54,6 @@ EXPECTED_PLUGINS = [
"register_token_handler",
"render_cell",
"row_actions",
"skip_csrf",
"startup",
"table_actions",
"view_actions",

View file

@ -444,11 +444,6 @@ def homepage_actions(datasette, actor, request):
]
@hookimpl
def skip_csrf(scope):
return scope["path"] == "/skip-csrf"
@hookimpl
def register_actions(datasette):
extras_old = datasette.plugin_config("datasette-register-permissions") or {}

View file

@ -109,31 +109,12 @@ def test_insert(canned_write_client):
assert response.headers["Location"] == "/data/add_name?success"
@pytest.mark.parametrize(
"query_name,expect_csrf_hidden_field",
[
("canned_read", False),
("add_name_specify_id", True),
("add_name", True),
],
)
def test_canned_query_form_csrf_hidden_field(
canned_write_client, query_name, expect_csrf_hidden_field
):
response = canned_write_client.get(f"/data/{query_name}")
html = response.text
fragment = '<input type="hidden" name="csrftoken" value="'
if expect_csrf_hidden_field:
assert fragment in html
else:
assert fragment not in html
def test_insert_with_cookies_requires_csrf(canned_write_client):
def test_insert_blocked_cross_site(canned_write_client):
# A cross-site POST (browser-originated) must be blocked
response = canned_write_client.post(
"/data/add_name",
{"name": "Hello"},
cookies={"foo": "bar"},
headers={"sec-fetch-site": "cross-site"},
)
assert 403 == response.status
@ -218,10 +199,11 @@ def test_custom_params(canned_write_client):
assert '<input type="text" id="qp3" name="extra" value="foo">' in response.text
def test_vary_header(canned_write_client):
# These forms embed a csrftoken so they should be served with Vary: Cookie
def test_canned_query_pages_no_vary_header(canned_write_client):
# These pages no longer embed per-cookie CSRF tokens, so they must not
# set Vary: Cookie - they should be cacheable across users.
assert "vary" not in canned_write_client.get("/data").headers
assert "Cookie" == canned_write_client.get("/data/update_name").headers["vary"]
assert "vary" not in canned_write_client.get("/data/update_name").headers
def test_json_post_body(canned_write_client):

View file

@ -0,0 +1,270 @@
"""
Tests for the header-based CSRF (Cross-Origin) protection middleware.
Datasette uses the Sec-Fetch-Site + Origin header approach described in
Filippo Valsorda's article (https://words.filippo.io/csrf/) and implemented
in Go 1.25's http.CrossOriginProtection. This replaces the previous
token-based asgi-csrf mechanism.
"""
import pluggy
import pytest
from datasette import hookimpl
from datasette.csrf import CrossOriginProtectionMiddleware, _install_legacy_csrftoken
async def _post(bare_ds, **kwargs):
kwargs.setdefault("data", {"message": "hello", "message_class": "info"})
return await bare_ds.client.post("/-/messages", **kwargs)
async def _run_middleware(scope):
"""
Run CrossOriginProtectionMiddleware against a scope and return
("allowed",) if the inner app was called, or ("blocked", status)
if the middleware sent a response itself.
"""
class FakeDs:
async def render_template(self, name, ctx):
return "BLOCKED"
inner_called = []
async def app(scope, receive, send):
inner_called.append(True)
sent = []
async def send(msg):
sent.append(msg)
mw = CrossOriginProtectionMiddleware(app, FakeDs())
await mw(scope, None, send)
if inner_called:
return ("allowed",)
start = [m for m in sent if m["type"] == "http.response.start"][0]
return ("blocked", start["status"])
def _http_scope(headers, method="POST"):
return {
"type": "http",
"method": method,
"headers": [(k.encode(), v.encode()) for k, v in headers.items()],
}
@pytest.mark.asyncio
@pytest.mark.parametrize("method", ["GET", "HEAD", "OPTIONS"])
async def test_safe_methods_always_pass(bare_ds, method):
# Safe methods bypass CSRF entirely, even with hostile headers
response = await bare_ds.client.request(
method,
"/-/messages",
headers={"sec-fetch-site": "cross-site", "origin": "http://evil.example"},
)
assert response.status_code != 403 or "origin" not in response.text.lower()
@pytest.mark.asyncio
@pytest.mark.parametrize("sec_fetch_site", ["same-origin", "none"])
async def test_post_with_trusted_sec_fetch_site_allowed(bare_ds, sec_fetch_site):
# "same-origin" = first-party; "none" = user-initiated direct navigation
response = await _post(bare_ds, headers={"sec-fetch-site": sec_fetch_site})
assert response.status_code != 403
@pytest.mark.asyncio
@pytest.mark.parametrize("sec_fetch_site", ["cross-site", "same-site", "cross-origin"])
async def test_post_with_untrusted_sec_fetch_site_blocked(bare_ds, sec_fetch_site):
# same-site is blocked too: different subdomains must not bypass CSRF
response = await _post(
bare_ds, data={"message": "hi"}, headers={"sec-fetch-site": sec_fetch_site}
)
assert response.status_code == 403
assert response.headers["content-type"].startswith("text/html")
@pytest.mark.asyncio
async def test_post_with_no_browser_headers_allowed(bare_ds):
# curl / requests / server-to-server: no Sec-Fetch-Site, no Origin.
# CSRF is browser-specific so these pass through.
response = await _post(bare_ds)
assert response.status_code != 403
@pytest.mark.asyncio
async def test_post_with_matching_origin_allowed(bare_ds):
# Fallback for older browsers without Sec-Fetch-Site: Origin must match Host
response = await _post(bare_ds, headers={"origin": "http://localhost"})
assert response.status_code != 403
@pytest.mark.asyncio
async def test_post_with_mismatched_origin_blocked(bare_ds):
response = await _post(
bare_ds, data={"message": "hi"}, headers={"origin": "http://evil.example.com"}
)
assert response.status_code == 403
@pytest.mark.asyncio
async def test_csrf_error_page_renders(bare_ds):
response = await _post(
bare_ds, data={"message": "hi"}, headers={"sec-fetch-site": "cross-site"}
)
assert response.status_code == 403
assert "origin" in response.text.lower()
@pytest.mark.asyncio
async def test_csrf_error_page_title_has_no_typo(bare_ds):
response = await _post(
bare_ds, data={"message": "hi"}, headers={"sec-fetch-site": "cross-site"}
)
assert "<title>CSRF check failed</title>" in response.text
assert "CSRF check failed)" not in response.text
@pytest.mark.asyncio
@pytest.mark.parametrize("scope_type", ["websocket", "lifespan"])
async def test_non_http_scope_passes_through(scope_type):
called = []
async def app(scope, receive, send):
called.append(scope["type"])
mw = CrossOriginProtectionMiddleware(app, datasette=None)
await mw({"type": scope_type}, None, None)
assert called == [scope_type]
@pytest.mark.asyncio
@pytest.mark.parametrize(
"label,headers,expected",
[
(
"plain cross-site blocked",
{"sec-fetch-site": "cross-site", "host": "example.com"},
("blocked", 403),
),
(
"basic auth does not bypass",
{
"sec-fetch-site": "cross-site",
"host": "example.com",
"authorization": "Basic dXNlcjpwYXNz",
},
("blocked", 403),
),
(
"bearer auth bypasses",
{
"sec-fetch-site": "cross-site",
"origin": "https://evil.example",
"host": "example.com",
"authorization": "Bearer dstok_abc",
},
("allowed",),
),
(
"bearer scheme case-insensitive",
{
"sec-fetch-site": "cross-site",
"host": "example.com",
"authorization": "bearer dstok_abc",
},
("allowed",),
),
(
"non-browser (no Sec-Fetch-Site, no Origin) allowed",
{"host": "example.com"},
("allowed",),
),
],
)
async def test_middleware_unit(label, headers, expected):
assert await _run_middleware(_http_scope(headers)) == expected
def test_legacy_csrftoken_scope_value_nonempty(app_client):
# GET /post/ calls request.scope["csrftoken"]() - must not 500
response = app_client.get("/post/")
assert response.status == 200
assert response.text.strip() != ""
assert len(response.text.strip()) >= 20
def test_legacy_csrftoken_no_ds_csrftoken_cookie(app_client):
response = app_client.get("/post/")
assert "ds_csrftoken" not in response.cookies
def test_legacy_csrftoken_varies_across_requests(app_client):
r1 = app_client.get("/post/").text.strip()
r2 = app_client.get("/post/").text.strip()
assert r1 != r2
def test_legacy_csrftoken_stable_within_request():
# Two calls in the same request return the same value
scope = {}
_install_legacy_csrftoken(scope)
assert scope["csrftoken"]() == scope["csrftoken"]()
@pytest.mark.asyncio
async def test_cross_site_post_blocked_even_with_ds_csrftoken_cookie(bare_ds):
# A stale ds_csrftoken cookie + csrftoken body field must NOT bypass
# the header-based CSRF check.
response = await _post(
bare_ds,
data={"message": "hi", "message_class": "info", "csrftoken": "abc"},
headers={"sec-fetch-site": "cross-site"},
cookies={"ds_csrftoken": "abc"},
)
assert response.status_code == 403
@pytest.mark.asyncio
async def test_bearer_invalid_token_not_csrf_error(bare_ds):
# Cross-site POST with bogus bearer must pass CSRF and be rejected
# by auth/permission handling, not by the CSRF middleware.
response = await _post(
bare_ds,
headers={
"sec-fetch-site": "cross-site",
"authorization": "Bearer totally-invalid-token",
},
)
if response.status_code == 403:
assert "origin" not in response.text.lower()
assert "sec-fetch-site" not in response.text.lower()
@pytest.mark.asyncio
async def test_cross_site_post_without_auth_still_blocked(bare_ds):
response = await _post(
bare_ds, data={"message": "hi"}, headers={"sec-fetch-site": "cross-site"}
)
assert response.status_code == 403
def test_legacy_skip_csrf_hookimpl_does_not_break_loading():
# Plugins that still define skip_csrf must load cleanly - pluggy ignores
# unknown hook implementations - even though the hook is no longer
# consulted by core. Use a throwaway PluginManager so that registering
# this hookimpl does not leak a _HookCaller onto the real datasette.pm.
class LegacyPlugin:
__name__ = "legacy-skip-csrf-plugin"
@hookimpl
def skip_csrf(self, datasette, scope):
return True
throwaway = pluggy.PluginManager("datasette")
plugin = LegacyPlugin()
throwaway.register(plugin, name=LegacyPlugin.__name__)
assert throwaway.is_registered(plugin)

View file

@ -1202,11 +1202,12 @@ async def test_custom_csrf_error(ds_client):
data={
"message": "A message",
},
cookies={"csrftoken": "x"},
headers={"sec-fetch-site": "cross-site"},
)
assert response.status_code == 403
assert response.headers["content-type"] == "text/html; charset=utf-8"
assert "Error code is FORM_URLENCODED_MISMATCH." in response.text
assert "Reason:" in response.text
assert "cross-site" in response.text
@pytest.mark.asyncio

View file

@ -711,10 +711,6 @@ async def test_actor_restricted_permissions(
perms_ds.pdb = True
perms_ds.root_enabled = True # Allow root actor to access /-/permissions
cookies = {"ds_actor": perms_ds.sign({"a": {"id": "root"}}, "actor")}
csrftoken = (await perms_ds.client.get("/-/permissions", cookies=cookies)).cookies[
"ds_csrftoken"
]
cookies["ds_csrftoken"] = csrftoken
response = await perms_ds.client.post(
"/-/permissions",
data={
@ -722,7 +718,6 @@ async def test_actor_restricted_permissions(
"permission": permission,
"resource_1": resource_1,
"resource_2": resource_2,
"csrftoken": csrftoken,
},
cookies=cookies,
)

View file

@ -812,21 +812,25 @@ def test_hook_register_routes_override():
def test_hook_register_routes_post(app_client):
response = app_client.post("/post/", {"this is": "post data"}, csrftoken_from=True)
response = app_client.post("/post/", {"this is": "post data"})
assert response.status_code == 200
assert "csrftoken" in response.json
assert response.json["this is"] == "post data"
def test_hook_register_routes_csrftoken(restore_working_directory, tmpdir_factory):
# csrftoken() is a legacy compatibility shim that returns a
# per-request random value - it is no longer used for CSRF enforcement.
templates = tmpdir_factory.mktemp("templates")
(templates / "csrftoken_form.html").write_text(
"CSRFTOKEN: {{ csrftoken() }}", "utf-8"
"CSRFTOKEN:{{ csrftoken() }}:END", "utf-8"
)
with make_app_client(template_dir=templates) as client:
response = client.get("/csrftoken-form/")
expected_token = client.ds._last_request.scope["csrftoken"]()
assert f"CSRFTOKEN: {expected_token}" == response.text
assert response.text.startswith("CSRFTOKEN:")
assert response.text.endswith(":END")
token = response.text[len("CSRFTOKEN:") : -len(":END")]
assert len(token) >= 20
assert "ds_csrftoken" not in response.cookies
@pytest.mark.asyncio
@ -1125,31 +1129,6 @@ async def test_hook_homepage_actions(ds_client):
]
def test_hook_skip_csrf(app_client):
cookie = app_client.actor_cookie({"id": "test"})
csrf_response = app_client.post(
"/post/",
post_data={"this is": "post data"},
csrftoken_from=True,
cookies={"ds_actor": cookie},
)
assert csrf_response.status_code == 200
missing_csrf_response = app_client.post(
"/post/", post_data={"this is": "post data"}, cookies={"ds_actor": cookie}
)
assert missing_csrf_response.status_code == 403
# But "/skip-csrf" should allow
allow_csrf_response = app_client.post(
"/skip-csrf", post_data={"this is": "post data"}, cookies={"ds_actor": cookie}
)
assert allow_csrf_response.status_code == 405 # Method not allowed
# /skip-csrf-2 should not
second_missing_csrf_response = app_client.post(
"/skip-csrf-2", post_data={"this is": "post data"}, cookies={"ds_actor": cookie}
)
assert second_missing_csrf_response.status_code == 403
def _extract_commands(output):
lines = output.split("Commands:\n", 1)[1].split("\n")
return {line.split()[0].replace("*", "") for line in lines if line.strip()}