CSRF protection (#798)

Closes #793.

* Rename RequestParameters to MultiParams, refs #799
* Allow tuples as well as lists in MultiParams, refs #799
* Use csrftokens when running tests, refs #799
* Use new csrftoken() function, refs https://github.com/simonw/asgi-csrf/issues/7
* Check for Vary: Cookie hedaer, refs https://github.com/simonw/asgi-csrf/issues/8
This commit is contained in:
Simon Willison 2020-06-05 12:05:57 -07:00 committed by GitHub
commit 84a9c4ff75
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 67 additions and 19 deletions

View file

@ -1,5 +1,5 @@
from datasette.app import Datasette
from datasette.utils import sqlite3
from datasette.utils import sqlite3, MultiParams
from asgiref.testing import ApplicationCommunicator
from asgiref.sync import async_to_sync
from http.cookies import SimpleCookie
@ -60,10 +60,35 @@ class TestClient:
@async_to_sync
async def post(
self, path, post_data=None, allow_redirects=True, redirect_count=0, cookies=None
self,
path,
post_data=None,
allow_redirects=True,
redirect_count=0,
content_type="application/x-www-form-urlencoded",
cookies=None,
csrftoken_from=None,
):
cookies = cookies or {}
post_data = post_data or {}
# Maybe fetch a csrftoken first
if csrftoken_from is not None:
if csrftoken_from is True:
csrftoken_from = path
token_response = await self._request(csrftoken_from)
# Check this had a Vary: Cookie header
assert "Cookie" == token_response.headers["vary"]
csrftoken = token_response.cookies["ds_csrftoken"]
cookies["ds_csrftoken"] = csrftoken
post_data["csrftoken"] = csrftoken
return await self._request(
path, allow_redirects, redirect_count, "POST", cookies, post_data
path,
allow_redirects,
redirect_count,
"POST",
cookies,
post_data,
content_type,
)
async def _request(
@ -74,6 +99,7 @@ class TestClient:
method="GET",
cookies=None,
post_data=None,
content_type=None,
):
query_string = b""
if "?" in path:
@ -84,6 +110,8 @@ class TestClient:
else:
raw_path = quote(path, safe="/:,").encode("latin-1")
headers = [[b"host", b"localhost"]]
if content_type:
headers.append((b"content-type", content_type.encode("utf-8")))
if cookies:
sc = SimpleCookie()
for key, value in cookies.items():
@ -111,7 +139,7 @@ class TestClient:
start = await instance.receive_output(2)
messages.append(start)
assert start["type"] == "http.response.start"
headers = dict(
response_headers = MultiParams(
[(k.decode("utf8"), v.decode("utf8")) for k, v in start["headers"]]
)
status = start["status"]
@ -124,7 +152,7 @@ class TestClient:
body += message["body"]
if not message.get("more_body"):
break
response = TestResponse(status, headers, body)
response = TestResponse(status, response_headers, body)
if allow_redirects and response.status in (301, 302):
assert (
redirect_count < self.max_redirects