Magic parameters for canned queries

Closes #842

Includes a new plugin hook, register_magic_parameters()
This commit is contained in:
Simon Willison 2020-06-27 19:58:16 -07:00 committed by GitHub
commit 563f5a2d3a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 477 additions and 167 deletions

View file

@ -49,6 +49,7 @@ EXPECTED_PLUGINS = [
"prepare_connection",
"prepare_jinja2_environment",
"register_facet_classes",
"register_magic_parameters",
"register_routes",
"render_cell",
"startup",

View file

@ -212,3 +212,25 @@ def canned_queries(datasette, database, actor):
actor["id"] if actor else "null"
)
}
@hookimpl
def register_magic_parameters():
from uuid import uuid4
def uuid(key, request):
if key == "new":
return str(uuid4())
else:
raise KeyError
def request(key, request):
if key == "http_version":
return request.scope["http_version"]
else:
raise KeyError
return [
("request", request),
("uuid", uuid),
]

View file

@ -601,18 +601,6 @@ def test_custom_sql(app_client):
assert not data["truncated"]
def test_canned_query_with_named_parameter(app_client):
response = app_client.get("/fixtures/neighborhood_search.json?text=town")
assert [
["Corktown", "Detroit", "MI"],
["Downtown", "Los Angeles", "CA"],
["Downtown", "Detroit", "MI"],
["Greektown", "Detroit", "MI"],
["Koreatown", "Los Angeles", "CA"],
["Mexicantown", "Detroit", "MI"],
] == response.json["rows"]
def test_sql_time_limit(app_client_shorter_time_limit):
response = app_client_shorter_time_limit.get("/fixtures.json?sql=select+sleep(0.5)")
assert 400 == response.status

View file

@ -1,5 +1,7 @@
from bs4 import BeautifulSoup as Soup
import pytest
from .fixtures import make_app_client
import re
from .fixtures import make_app_client, app_client
@pytest.fixture
@ -39,6 +41,18 @@ def canned_write_client():
yield client
def test_canned_query_with_named_parameter(app_client):
response = app_client.get("/fixtures/neighborhood_search.json?text=town")
assert [
["Corktown", "Detroit", "MI"],
["Downtown", "Los Angeles", "CA"],
["Downtown", "Detroit", "MI"],
["Greektown", "Detroit", "MI"],
["Koreatown", "Los Angeles", "CA"],
["Mexicantown", "Detroit", "MI"],
] == response.json["rows"]
def test_insert(canned_write_client):
response = canned_write_client.post(
"/data/add_name", {"name": "Hello"}, allow_redirects=False, csrftoken_from=True,
@ -147,3 +161,74 @@ def test_canned_query_permissions(canned_write_client):
cookies = {"ds_actor": canned_write_client.actor_cookie({"id": "root"})}
assert 200 == canned_write_client.get("/data/delete_name", cookies=cookies).status
assert 200 == canned_write_client.get("/data/update_name", cookies=cookies).status
@pytest.fixture(scope="session")
def magic_parameters_client():
with make_app_client(
extra_databases={"data.db": "create table logs (line text)"},
metadata={
"databases": {
"data": {
"queries": {
"runme_post": {"sql": "", "write": True},
"runme_get": {"sql": ""},
}
}
}
},
) as client:
yield client
@pytest.mark.parametrize(
"magic_parameter,expected_re",
[
("_actor_id", "root"),
("_header_host", "localhost"),
("_cookie_foo", "bar"),
("_timestamp_epoch", r"^\d+$"),
("_timestamp_date_utc", r"^\d{4}-\d{2}-\d{2}$"),
("_timestamp_datetime_utc", r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$"),
("_random_chars_1", r"^\w$"),
("_random_chars_10", r"^\w{10}$"),
],
)
def test_magic_parameters(magic_parameters_client, magic_parameter, expected_re):
magic_parameters_client.ds._metadata["databases"]["data"]["queries"]["runme_post"][
"sql"
] = "insert into logs (line) values (:{})".format(magic_parameter)
magic_parameters_client.ds._metadata["databases"]["data"]["queries"]["runme_get"][
"sql"
] = "select :{} as result".format(magic_parameter)
cookies = {
"ds_actor": magic_parameters_client.actor_cookie({"id": "root"}),
"foo": "bar",
}
# Test the GET version
get_response = magic_parameters_client.get(
"/data/runme_get.json?_shape=array", cookies=cookies
)
get_actual = get_response.json[0]["result"]
assert re.match(expected_re, str(get_actual))
# Test the form
form_response = magic_parameters_client.get("/data/runme_post")
soup = Soup(form_response.body, "html.parser")
# The magic parameter should not be represented as a form field
assert None is soup.find("input", {"name": magic_parameter})
# Submit the form to create a log line
response = magic_parameters_client.post(
"/data/runme_post", {}, csrftoken_from=True, cookies=cookies
)
post_actual = magic_parameters_client.get(
"/data/logs.json?_sort_desc=rowid&_shape=array"
).json[0]["line"]
assert re.match(expected_re, post_actual)
def test_magic_parameters_cannot_be_used_in_arbitrary_queries(magic_parameters_client):
response = magic_parameters_client.get(
"/data.json?sql=select+:_header_host&_shape=array"
)
assert 500 == response.status
assert "You did not supply a value for binding 1." == response.json["error"]

View file

@ -638,3 +638,31 @@ def test_canned_queries_actor(app_client):
assert [{"1": 1, "actor_id": "bot"}] == app_client.get(
"/fixtures/from_hook.json?_bot=1&_shape=array"
).json
def test_register_magic_parameters(restore_working_directory):
with make_app_client(
extra_databases={"data.db": "create table logs (line text)"},
metadata={
"databases": {
"data": {
"queries": {
"runme": {
"sql": "insert into logs (line) values (:_request_http_version)",
"write": True,
},
"get_uuid": {"sql": "select :_uuid_new",},
}
}
}
},
) as client:
response = client.post("/data/runme", {}, csrftoken_from=True)
assert 200 == response.status
actual = client.get("/data/logs.json?_sort_desc=rowid&_shape=array").json
assert [{"rowid": 1, "line": "1.0"}] == actual
# Now try the GET request against get_uuid
response_get = client.get("/data/get_uuid.json?_shape=array")
assert 200 == response_get.status
new_uuid = response_get.json[0][":_uuid_new"]
assert 4 == new_uuid.count("-")