diff --git a/datasette/utils/__init__.py b/datasette/utils/__init__.py
index bdebfc30..94ccc23e 100644
--- a/datasette/utils/__init__.py
+++ b/datasette/utils/__init__.py
@@ -748,7 +748,7 @@ class RequestParameters(dict):
"Return first value in the list, if available"
try:
return super().get(name)[0]
- except KeyError:
+ except (KeyError, TypeError):
return default
def getlist(self, name, default=None):
diff --git a/datasette/utils/asgi.py b/datasette/utils/asgi.py
index 6e005ab3..08cb44aa 100644
--- a/datasette/utils/asgi.py
+++ b/datasette/utils/asgi.py
@@ -1,6 +1,7 @@
import json
+from datasette.utils import RequestParameters
from mimetypes import guess_type
-from sanic.request import Request as SanicRequest
+from urllib.parse import parse_qs, urlunparse
from pathlib import Path
from html import escape
import re
@@ -11,6 +12,73 @@ class NotFound(Exception):
pass
+class Request:
+ def __init__(self, scope):
+ self.scope = scope
+
+ @property
+ def method(self):
+ return self.scope["method"]
+
+ @property
+ def url(self):
+ return urlunparse(
+ (self.scheme, self.host, self.path, None, self.query_string, None)
+ )
+
+ @property
+ def scheme(self):
+ return self.scope.get("scheme") or "http"
+
+ @property
+ def headers(self):
+ return dict(
+ [
+ (k.decode("latin-1").lower(), v.decode("latin-1"))
+ for k, v in self.scope.get("headers") or []
+ ]
+ )
+
+ @property
+ def host(self):
+ return self.headers.get("host") or "localhost"
+
+ @property
+ def path(self):
+ return (
+ self.scope.get("raw_path", self.scope["path"].encode("latin-1"))
+ ).decode("latin-1")
+
+ @property
+ def query_string(self):
+ return (self.scope.get("query_string") or b"").decode("latin-1")
+
+ @property
+ def args(self):
+ return RequestParameters(parse_qs(qs=self.query_string))
+
+ @property
+ def raw_args(self):
+ return {key: value[0] for key, value in self.args.items()}
+
+ @classmethod
+ def from_path_with_query_string(
+ cls, path_with_query_string, method="GET", scheme="http"
+ ):
+ "Useful for constructing Request objects for tests"
+ path, _, query_string = path_with_query_string.partition("?")
+ scope = {
+ "http_version": "1.1",
+ "method": method,
+ "path": path,
+ "raw_path": path.encode("latin-1"),
+ "query_string": query_string.encode("latin-1"),
+ "scheme": scheme,
+ "type": "http",
+ }
+ return cls(scope)
+
+
class AsgiRouter:
def __init__(self, routes=None):
routes = routes or []
@@ -52,7 +120,7 @@ class AsgiRouter:
}
)
html = "
500
".format(escape(repr(exception)))
- await send({"type": "http.response.body", "body": html.encode("utf8")})
+ await send({"type": "http.response.body", "body": html.encode("latin-1")})
class AsgiLifespan:
@@ -92,34 +160,12 @@ class AsgiView:
@classmethod
def as_asgi(cls, *class_args, **class_kwargs):
async def view(scope, receive, send):
- # Uses scope to create a Sanic-compatible request object,
- # then dispatches that to self.get(...) or self.options(...)
- # along with keyword arguments that were already tucked
- # into scope["url_route"]["kwargs"] by the router
+ # Uses scope to create a request object, then dispatches that to
+ # self.get(...) or self.options(...) along with keyword arguments
+ # that were already tucked into scope["url_route"]["kwargs"] by
+ # the router, similar to how Django Channels works:
# https://channels.readthedocs.io/en/latest/topics/routing.html#urlrouter
- path = scope.get("raw_path", scope["path"].encode("utf8"))
- if scope["query_string"]:
- path = path + b"?" + scope["query_string"]
- request = SanicRequest(
- path,
- {
- "Host": dict(scope.get("headers") or [])
- .get(b"host", b"")
- .decode("utf8")
- },
- "1.1",
- scope["method"],
- None,
- )
-
- # TODO: Remove need for this
- class Woo:
- def get_extra_info(self, key):
- return False
-
- request.app = Woo()
- request.app.websocket_enabled = False
- request.transport = Woo()
+ request = Request(scope)
self = view.view_class(*class_args, **class_kwargs)
response = await self.dispatch_request(
request, **scope["url_route"]["kwargs"]
@@ -185,7 +231,7 @@ class AsgiWriter:
await self.send(
{
"type": "http.response.body",
- "body": chunk.encode("utf8"),
+ "body": chunk.encode("latin-1"),
"more_body": True,
}
)
@@ -221,7 +267,7 @@ async def asgi_send_redirect(send, location, status=302):
async def asgi_send(send, content, status, headers=None, content_type="text/plain"):
await asgi_start(send, status, headers, content_type)
- await send({"type": "http.response.body", "body": content.encode("utf8")})
+ await send({"type": "http.response.body", "body": content.encode("latin-1")})
async def asgi_start(send, status, headers=None, content_type="text/plain"):
diff --git a/pytest.ini b/pytest.ini
index f2c8a6d2..aa292efc 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -4,7 +4,5 @@ filterwarnings=
ignore:Using or importing the ABCs::jinja2
# https://bugs.launchpad.net/beautifulsoup/+bug/1778909
ignore:Using or importing the ABCs::bs4.element
- # Sanic verify_ssl=True
- ignore:verify_ssl is deprecated::sanic
# Python 3.7 PendingDeprecationWarning: Task.current_task()
ignore:.*current_task.*:PendingDeprecationWarning
diff --git a/tests/test_utils.py b/tests/test_utils.py
index a5f603e6..cbf9eba7 100644
--- a/tests/test_utils.py
+++ b/tests/test_utils.py
@@ -3,11 +3,11 @@ Tests for various datasette helper functions.
"""
from datasette import utils
+from datasette.utils.asgi import Request
from datasette.filters import Filters
import json
import os
import pytest
-from sanic.request import Request
import sqlite3
import tempfile
from unittest.mock import patch
@@ -53,7 +53,7 @@ def test_urlsafe_components(path, expected):
],
)
def test_path_with_added_args(path, added_args, expected):
- request = Request(path.encode("utf8"), {}, "1.1", "GET", None)
+ request = Request.from_path_with_query_string(path)
actual = utils.path_with_added_args(request, added_args)
assert expected == actual
@@ -67,11 +67,11 @@ def test_path_with_added_args(path, added_args, expected):
],
)
def test_path_with_removed_args(path, args, expected):
- request = Request(path.encode("utf8"), {}, "1.1", "GET", None)
+ request = Request.from_path_with_query_string(path)
actual = utils.path_with_removed_args(request, args)
assert expected == actual
# Run the test again but this time use the path= argument
- request = Request("/".encode("utf8"), {}, "1.1", "GET", None)
+ request = Request.from_path_with_query_string("/")
actual = utils.path_with_removed_args(request, args, path=path)
assert expected == actual
@@ -84,7 +84,7 @@ def test_path_with_removed_args(path, args, expected):
],
)
def test_path_with_replaced_args(path, args, expected):
- request = Request(path.encode("utf8"), {}, "1.1", "GET", None)
+ request = Request.from_path_with_query_string(path)
actual = utils.path_with_replaced_args(request, args)
assert expected == actual
@@ -363,7 +363,7 @@ def test_table_columns():
],
)
def test_path_with_format(path, format, extra_qs, expected):
- request = Request(path.encode("utf8"), {}, "1.1", "GET", None)
+ request = Request.from_path_with_query_string(path)
actual = utils.path_with_format(request, format, extra_qs)
assert expected == actual