mirror of
https://github.com/simonw/datasette.git
synced 2025-12-10 16:51:24 +01:00
Context manager with open closes the files after usage. When the object is already a pathlib.Path i used read_text write_text functions In some cases pathlib.Path.open were used in context manager, it is basically the same as builtin open. Thanks, Konstantin Baikov!
827 lines
28 KiB
Python
827 lines
28 KiB
Python
from bs4 import BeautifulSoup as Soup
|
|
from .fixtures import (
|
|
app_client,
|
|
make_app_client,
|
|
TABLES,
|
|
TEMP_PLUGIN_SECRET_FILE,
|
|
TestClient as _TestClient,
|
|
) # noqa
|
|
from datasette.app import Datasette
|
|
from datasette import cli
|
|
from datasette.plugins import get_plugins, DEFAULT_PLUGINS, pm
|
|
from datasette.utils.sqlite import sqlite3
|
|
from datasette.utils import CustomRow
|
|
from jinja2.environment import Template
|
|
import base64
|
|
import json
|
|
import os
|
|
import pathlib
|
|
import re
|
|
import textwrap
|
|
import pytest
|
|
import urllib
|
|
|
|
at_memory_re = re.compile(r" at 0x\w+")
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"plugin_hook", [name for name in dir(pm.hook) if not name.startswith("_")]
|
|
)
|
|
def test_plugin_hooks_have_tests(plugin_hook):
|
|
"""Every plugin hook should be referenced in this test module"""
|
|
tests_in_this_module = [t for t in globals().keys() if t.startswith("test_hook_")]
|
|
ok = False
|
|
for test in tests_in_this_module:
|
|
if plugin_hook in test:
|
|
ok = True
|
|
assert ok, f"Plugin hook is missing tests: {plugin_hook}"
|
|
|
|
|
|
def test_hook_plugins_dir_plugin_prepare_connection(app_client):
|
|
response = app_client.get(
|
|
"/fixtures.json?sql=select+convert_units(100%2C+'m'%2C+'ft')"
|
|
)
|
|
assert pytest.approx(328.0839) == response.json["rows"][0][0]
|
|
|
|
|
|
def test_hook_plugin_prepare_connection_arguments(app_client):
|
|
response = app_client.get(
|
|
"/fixtures.json?sql=select+prepare_connection_args()&_shape=arrayfirst"
|
|
)
|
|
assert [
|
|
"database=fixtures, datasette.plugin_config(\"name-of-plugin\")={'depth': 'root'}"
|
|
] == response.json
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"path,expected_decoded_object",
|
|
[
|
|
(
|
|
"/",
|
|
{
|
|
"template": "index.html",
|
|
"database": None,
|
|
"table": None,
|
|
"view_name": "index",
|
|
"request_path": "/",
|
|
"added": 15,
|
|
"columns": None,
|
|
},
|
|
),
|
|
(
|
|
"/fixtures/",
|
|
{
|
|
"template": "database.html",
|
|
"database": "fixtures",
|
|
"table": None,
|
|
"view_name": "database",
|
|
"request_path": "/fixtures",
|
|
"added": 15,
|
|
"columns": None,
|
|
},
|
|
),
|
|
(
|
|
"/fixtures/sortable",
|
|
{
|
|
"template": "table.html",
|
|
"database": "fixtures",
|
|
"table": "sortable",
|
|
"view_name": "table",
|
|
"request_path": "/fixtures/sortable",
|
|
"added": 15,
|
|
"columns": [
|
|
"pk1",
|
|
"pk2",
|
|
"content",
|
|
"sortable",
|
|
"sortable_with_nulls",
|
|
"sortable_with_nulls_2",
|
|
"text",
|
|
],
|
|
},
|
|
),
|
|
],
|
|
)
|
|
def test_hook_extra_css_urls(app_client, path, expected_decoded_object):
|
|
response = app_client.get(path)
|
|
links = Soup(response.body, "html.parser").findAll("link")
|
|
special_href = [
|
|
l for l in links if l.attrs["href"].endswith("/extra-css-urls-demo.css")
|
|
][0]["href"]
|
|
# This link has a base64-encoded JSON blob in it
|
|
encoded = special_href.split("/")[3]
|
|
assert expected_decoded_object == json.loads(
|
|
base64.b64decode(encoded).decode("utf8")
|
|
)
|
|
|
|
|
|
def test_hook_extra_js_urls(app_client):
|
|
response = app_client.get("/")
|
|
scripts = Soup(response.body, "html.parser").findAll("script")
|
|
script_attrs = [s.attrs for s in scripts]
|
|
for attrs in [
|
|
{
|
|
"integrity": "SRIHASH",
|
|
"crossorigin": "anonymous",
|
|
"src": "https://plugin-example.datasette.io/jquery.js",
|
|
},
|
|
{
|
|
"src": "https://plugin-example.datasette.io/plugin.module.js",
|
|
"type": "module",
|
|
},
|
|
]:
|
|
assert any(s == attrs for s in script_attrs), "Expected: {}".format(attrs)
|
|
|
|
|
|
def test_plugins_with_duplicate_js_urls(app_client):
|
|
# If two plugins both require jQuery, jQuery should be loaded only once
|
|
response = app_client.get("/fixtures")
|
|
# This test is a little tricky, as if the user has any other plugins in
|
|
# their current virtual environment those may affect what comes back too.
|
|
# What matters is that https://plugin-example.datasette.io/jquery.js is only there once
|
|
# and it comes before plugin1.js and plugin2.js which could be in either
|
|
# order
|
|
scripts = Soup(response.body, "html.parser").findAll("script")
|
|
srcs = [s["src"] for s in scripts if s.get("src")]
|
|
# No duplicates allowed:
|
|
assert len(srcs) == len(set(srcs))
|
|
# jquery.js loaded once:
|
|
assert 1 == srcs.count("https://plugin-example.datasette.io/jquery.js")
|
|
# plugin1.js and plugin2.js are both there:
|
|
assert 1 == srcs.count("https://plugin-example.datasette.io/plugin1.js")
|
|
assert 1 == srcs.count("https://plugin-example.datasette.io/plugin2.js")
|
|
# jquery comes before them both
|
|
assert srcs.index("https://plugin-example.datasette.io/jquery.js") < srcs.index(
|
|
"https://plugin-example.datasette.io/plugin1.js"
|
|
)
|
|
assert srcs.index("https://plugin-example.datasette.io/jquery.js") < srcs.index(
|
|
"https://plugin-example.datasette.io/plugin2.js"
|
|
)
|
|
|
|
|
|
def test_hook_render_cell_link_from_json(app_client):
|
|
sql = """
|
|
select '{"href": "http://example.com/", "label":"Example"}'
|
|
""".strip()
|
|
path = "/fixtures?" + urllib.parse.urlencode({"sql": sql})
|
|
response = app_client.get(path)
|
|
td = Soup(response.body, "html.parser").find("table").find("tbody").find("td")
|
|
a = td.find("a")
|
|
assert a is not None, str(a)
|
|
assert a.attrs["href"] == "http://example.com/"
|
|
assert a.attrs["data-database"] == "fixtures"
|
|
assert a.text == "Example"
|
|
|
|
|
|
def test_hook_render_cell_demo(app_client):
|
|
response = app_client.get("/fixtures/simple_primary_key?id=4")
|
|
soup = Soup(response.body, "html.parser")
|
|
td = soup.find("td", {"class": "col-content"})
|
|
assert {
|
|
"column": "content",
|
|
"table": "simple_primary_key",
|
|
"database": "fixtures",
|
|
"config": {"depth": "table", "special": "this-is-simple_primary_key"},
|
|
} == json.loads(td.string)
|
|
|
|
|
|
def test_plugin_config(app_client):
|
|
assert {"depth": "table"} == app_client.ds.plugin_config(
|
|
"name-of-plugin", database="fixtures", table="sortable"
|
|
)
|
|
assert {"depth": "database"} == app_client.ds.plugin_config(
|
|
"name-of-plugin", database="fixtures", table="unknown_table"
|
|
)
|
|
assert {"depth": "database"} == app_client.ds.plugin_config(
|
|
"name-of-plugin", database="fixtures"
|
|
)
|
|
assert {"depth": "root"} == app_client.ds.plugin_config(
|
|
"name-of-plugin", database="unknown_database"
|
|
)
|
|
assert {"depth": "root"} == app_client.ds.plugin_config("name-of-plugin")
|
|
assert None is app_client.ds.plugin_config("unknown-plugin")
|
|
|
|
|
|
def test_plugin_config_env(app_client):
|
|
os.environ["FOO_ENV"] = "FROM_ENVIRONMENT"
|
|
assert {"foo": "FROM_ENVIRONMENT"} == app_client.ds.plugin_config("env-plugin")
|
|
# Ensure secrets aren't visible in /-/metadata.json
|
|
metadata = app_client.get("/-/metadata.json")
|
|
assert {"foo": {"$env": "FOO_ENV"}} == metadata.json["plugins"]["env-plugin"]
|
|
del os.environ["FOO_ENV"]
|
|
|
|
|
|
def test_plugin_config_env_from_list(app_client):
|
|
os.environ["FOO_ENV"] = "FROM_ENVIRONMENT"
|
|
assert [{"in_a_list": "FROM_ENVIRONMENT"}] == app_client.ds.plugin_config(
|
|
"env-plugin-list"
|
|
)
|
|
# Ensure secrets aren't visible in /-/metadata.json
|
|
metadata = app_client.get("/-/metadata.json")
|
|
assert [{"in_a_list": {"$env": "FOO_ENV"}}] == metadata.json["plugins"][
|
|
"env-plugin-list"
|
|
]
|
|
del os.environ["FOO_ENV"]
|
|
|
|
|
|
def test_plugin_config_file(app_client):
|
|
with open(TEMP_PLUGIN_SECRET_FILE, "w") as fp:
|
|
fp.write("FROM_FILE")
|
|
assert {"foo": "FROM_FILE"} == app_client.ds.plugin_config("file-plugin")
|
|
# Ensure secrets aren't visible in /-/metadata.json
|
|
metadata = app_client.get("/-/metadata.json")
|
|
assert {"foo": {"$file": TEMP_PLUGIN_SECRET_FILE}} == metadata.json["plugins"][
|
|
"file-plugin"
|
|
]
|
|
os.remove(TEMP_PLUGIN_SECRET_FILE)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"path,expected_extra_body_script",
|
|
[
|
|
(
|
|
"/",
|
|
{
|
|
"template": "index.html",
|
|
"database": None,
|
|
"table": None,
|
|
"config": {"depth": "root"},
|
|
"view_name": "index",
|
|
"request_path": "/",
|
|
"added": 15,
|
|
"columns": None,
|
|
},
|
|
),
|
|
(
|
|
"/fixtures/",
|
|
{
|
|
"template": "database.html",
|
|
"database": "fixtures",
|
|
"table": None,
|
|
"config": {"depth": "database"},
|
|
"view_name": "database",
|
|
"request_path": "/fixtures",
|
|
"added": 15,
|
|
"columns": None,
|
|
},
|
|
),
|
|
(
|
|
"/fixtures/sortable",
|
|
{
|
|
"template": "table.html",
|
|
"database": "fixtures",
|
|
"table": "sortable",
|
|
"config": {"depth": "table"},
|
|
"view_name": "table",
|
|
"request_path": "/fixtures/sortable",
|
|
"added": 15,
|
|
"columns": [
|
|
"pk1",
|
|
"pk2",
|
|
"content",
|
|
"sortable",
|
|
"sortable_with_nulls",
|
|
"sortable_with_nulls_2",
|
|
"text",
|
|
],
|
|
},
|
|
),
|
|
],
|
|
)
|
|
def test_hook_extra_body_script(app_client, path, expected_extra_body_script):
|
|
r = re.compile(r"<script type=\"module\">var extra_body_script = (.*?);</script>")
|
|
json_data = r.search(app_client.get(path).text).group(1)
|
|
actual_data = json.loads(json_data)
|
|
assert expected_extra_body_script == actual_data
|
|
|
|
|
|
def test_hook_asgi_wrapper(app_client):
|
|
response = app_client.get("/fixtures")
|
|
assert "_internal, fixtures" == response.headers["x-databases"]
|
|
|
|
|
|
def test_hook_extra_template_vars(restore_working_directory):
|
|
with make_app_client(
|
|
template_dir=str(pathlib.Path(__file__).parent / "test_templates")
|
|
) as client:
|
|
response = client.get("/-/metadata")
|
|
assert response.status == 200
|
|
extra_template_vars = json.loads(
|
|
Soup(response.body, "html.parser").select("pre.extra_template_vars")[0].text
|
|
)
|
|
assert {
|
|
"template": "show_json.html",
|
|
"scope_path": "/-/metadata",
|
|
"columns": None,
|
|
} == extra_template_vars
|
|
extra_template_vars_from_awaitable = json.loads(
|
|
Soup(response.body, "html.parser")
|
|
.select("pre.extra_template_vars_from_awaitable")[0]
|
|
.text
|
|
)
|
|
assert {
|
|
"template": "show_json.html",
|
|
"awaitable": True,
|
|
"scope_path": "/-/metadata",
|
|
} == extra_template_vars_from_awaitable
|
|
|
|
|
|
def test_plugins_async_template_function(restore_working_directory):
|
|
with make_app_client(
|
|
template_dir=str(pathlib.Path(__file__).parent / "test_templates")
|
|
) as client:
|
|
response = client.get("/-/metadata")
|
|
assert response.status == 200
|
|
extra_from_awaitable_function = (
|
|
Soup(response.body, "html.parser")
|
|
.select("pre.extra_from_awaitable_function")[0]
|
|
.text
|
|
)
|
|
expected = (
|
|
sqlite3.connect(":memory:").execute("select sqlite_version()").fetchone()[0]
|
|
)
|
|
assert expected == extra_from_awaitable_function
|
|
|
|
|
|
def test_default_plugins_have_no_templates_path_or_static_path():
|
|
# The default plugins that ship with Datasette should have their static_path and
|
|
# templates_path all set to None
|
|
plugins = get_plugins()
|
|
for plugin in plugins:
|
|
if plugin["name"] in DEFAULT_PLUGINS:
|
|
assert None is plugin["static_path"]
|
|
assert None is plugin["templates_path"]
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def view_names_client(tmp_path_factory):
|
|
tmpdir = tmp_path_factory.mktemp("test-view-names")
|
|
templates = tmpdir / "templates"
|
|
templates.mkdir()
|
|
plugins = tmpdir / "plugins"
|
|
plugins.mkdir()
|
|
for template in (
|
|
"index.html",
|
|
"database.html",
|
|
"table.html",
|
|
"row.html",
|
|
"show_json.html",
|
|
"query.html",
|
|
):
|
|
(templates / template).write_text("view_name:{{ view_name }}", "utf-8")
|
|
(plugins / "extra_vars.py").write_text(
|
|
textwrap.dedent(
|
|
"""
|
|
from datasette import hookimpl
|
|
@hookimpl
|
|
def extra_template_vars(view_name):
|
|
return {"view_name": view_name}
|
|
"""
|
|
),
|
|
"utf-8",
|
|
)
|
|
db_path = str(tmpdir / "fixtures.db")
|
|
conn = sqlite3.connect(db_path)
|
|
conn.executescript(TABLES)
|
|
return _TestClient(
|
|
Datasette([db_path], template_dir=str(templates), plugins_dir=str(plugins))
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"path,view_name",
|
|
(
|
|
("/", "index"),
|
|
("/fixtures", "database"),
|
|
("/fixtures/units", "table"),
|
|
("/fixtures/units/1", "row"),
|
|
("/-/metadata", "json_data"),
|
|
("/fixtures?sql=select+1", "database"),
|
|
),
|
|
)
|
|
def test_view_names(view_names_client, path, view_name):
|
|
response = view_names_client.get(path)
|
|
assert response.status == 200
|
|
assert f"view_name:{view_name}" == response.text
|
|
|
|
|
|
def test_hook_register_output_renderer_no_parameters(app_client):
|
|
response = app_client.get("/fixtures/facetable.testnone")
|
|
assert 200 == response.status
|
|
assert b"Hello" == response.body
|
|
|
|
|
|
def test_hook_register_output_renderer_all_parameters(app_client):
|
|
response = app_client.get("/fixtures/facetable.testall")
|
|
assert 200 == response.status
|
|
# Lots of 'at 0x103a4a690' in here - replace those so we can do
|
|
# an easy comparison
|
|
body = at_memory_re.sub(" at 0xXXX", response.text)
|
|
assert json.loads(body) == {
|
|
"datasette": "<datasette.app.Datasette object at 0xXXX>",
|
|
"columns": [
|
|
"pk",
|
|
"created",
|
|
"planet_int",
|
|
"on_earth",
|
|
"state",
|
|
"city_id",
|
|
"neighborhood",
|
|
"tags",
|
|
"complex_array",
|
|
"distinct_some_null",
|
|
],
|
|
"rows": [
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
"<sqlite3.Row object at 0xXXX>",
|
|
],
|
|
"sql": "select pk, created, planet_int, on_earth, state, city_id, neighborhood, tags, complex_array, distinct_some_null from facetable order by pk limit 51",
|
|
"query_name": None,
|
|
"database": "fixtures",
|
|
"table": "facetable",
|
|
"request": "<datasette.utils.asgi.Request object at 0xXXX>",
|
|
"view_name": "table",
|
|
"1+1": 2,
|
|
}
|
|
# Test that query_name is set correctly
|
|
query_response = app_client.get("/fixtures/pragma_cache_size.testall")
|
|
assert "pragma_cache_size" == json.loads(query_response.body)["query_name"]
|
|
|
|
|
|
def test_hook_register_output_renderer_custom_status_code(app_client):
|
|
response = app_client.get("/fixtures/pragma_cache_size.testall?status_code=202")
|
|
assert 202 == response.status
|
|
|
|
|
|
def test_hook_register_output_renderer_custom_content_type(app_client):
|
|
response = app_client.get(
|
|
"/fixtures/pragma_cache_size.testall?content_type=text/blah"
|
|
)
|
|
assert "text/blah" == response.headers["content-type"]
|
|
|
|
|
|
def test_hook_register_output_renderer_custom_headers(app_client):
|
|
response = app_client.get(
|
|
"/fixtures/pragma_cache_size.testall?header=x-wow:1&header=x-gosh:2"
|
|
)
|
|
assert "1" == response.headers["x-wow"]
|
|
assert "2" == response.headers["x-gosh"]
|
|
|
|
|
|
def test_hook_register_output_renderer_returning_response(app_client):
|
|
response = app_client.get("/fixtures/facetable.testresponse")
|
|
assert 200 == response.status
|
|
assert response.json == {"this_is": "json"}
|
|
|
|
|
|
def test_hook_register_output_renderer_returning_broken_value(app_client):
|
|
response = app_client.get("/fixtures/facetable.testresponse?_broken=1")
|
|
assert 500 == response.status
|
|
assert "this should break should be dict or Response" in response.text
|
|
|
|
|
|
def test_hook_register_output_renderer_can_render(app_client):
|
|
response = app_client.get("/fixtures/facetable?_no_can_render=1")
|
|
assert response.status == 200
|
|
links = (
|
|
Soup(response.body, "html.parser")
|
|
.find("p", {"class": "export-links"})
|
|
.findAll("a")
|
|
)
|
|
actual = [l["href"] for l in links]
|
|
# Should not be present because we sent ?_no_can_render=1
|
|
assert "/fixtures/facetable.testall?_labels=on" not in actual
|
|
# Check that it was passed the values we expected
|
|
assert hasattr(app_client.ds, "_can_render_saw")
|
|
assert {
|
|
"datasette": app_client.ds,
|
|
"columns": [
|
|
"pk",
|
|
"created",
|
|
"planet_int",
|
|
"on_earth",
|
|
"state",
|
|
"city_id",
|
|
"neighborhood",
|
|
"tags",
|
|
"complex_array",
|
|
"distinct_some_null",
|
|
],
|
|
"sql": "select pk, created, planet_int, on_earth, state, city_id, neighborhood, tags, complex_array, distinct_some_null from facetable order by pk limit 51",
|
|
"query_name": None,
|
|
"database": "fixtures",
|
|
"table": "facetable",
|
|
"view_name": "table",
|
|
}.items() <= app_client.ds._can_render_saw.items()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_hook_prepare_jinja2_environment(app_client):
|
|
template = app_client.ds.jinja_env.from_string(
|
|
"Hello there, {{ a|format_numeric }}", {"a": 3412341}
|
|
)
|
|
rendered = await app_client.ds.render_template(template)
|
|
assert "Hello there, 3,412,341" == rendered
|
|
|
|
|
|
def test_hook_publish_subcommand():
|
|
# This is hard to test properly, because publish subcommand plugins
|
|
# cannot be loaded using the --plugins-dir mechanism - they need
|
|
# to be installed using "pip install". So I'm cheating and taking
|
|
# advantage of the fact that cloudrun/heroku use the plugin hook
|
|
# to register themselves as default plugins.
|
|
assert ["cloudrun", "heroku"] == cli.publish.list_commands({})
|
|
|
|
|
|
def test_hook_register_facet_classes(app_client):
|
|
response = app_client.get(
|
|
"/fixtures/compound_three_primary_keys.json?_dummy_facet=1"
|
|
)
|
|
assert [
|
|
{
|
|
"name": "pk1",
|
|
"toggle_url": "http://localhost/fixtures/compound_three_primary_keys.json?_dummy_facet=1&_facet_dummy=pk1",
|
|
"type": "dummy",
|
|
},
|
|
{
|
|
"name": "pk2",
|
|
"toggle_url": "http://localhost/fixtures/compound_three_primary_keys.json?_dummy_facet=1&_facet_dummy=pk2",
|
|
"type": "dummy",
|
|
},
|
|
{
|
|
"name": "pk3",
|
|
"toggle_url": "http://localhost/fixtures/compound_three_primary_keys.json?_dummy_facet=1&_facet_dummy=pk3",
|
|
"type": "dummy",
|
|
},
|
|
{
|
|
"name": "content",
|
|
"toggle_url": "http://localhost/fixtures/compound_three_primary_keys.json?_dummy_facet=1&_facet_dummy=content",
|
|
"type": "dummy",
|
|
},
|
|
{
|
|
"name": "pk1",
|
|
"toggle_url": "http://localhost/fixtures/compound_three_primary_keys.json?_dummy_facet=1&_facet=pk1",
|
|
},
|
|
{
|
|
"name": "pk2",
|
|
"toggle_url": "http://localhost/fixtures/compound_three_primary_keys.json?_dummy_facet=1&_facet=pk2",
|
|
},
|
|
{
|
|
"name": "pk3",
|
|
"toggle_url": "http://localhost/fixtures/compound_three_primary_keys.json?_dummy_facet=1&_facet=pk3",
|
|
},
|
|
] == response.json["suggested_facets"]
|
|
|
|
|
|
def test_hook_actor_from_request(app_client):
|
|
app_client.get("/")
|
|
# Should have no actor
|
|
assert None == app_client.ds._last_request.scope["actor"]
|
|
app_client.get("/?_bot=1")
|
|
# Should have bot actor
|
|
assert {"id": "bot"} == app_client.ds._last_request.scope["actor"]
|
|
|
|
|
|
def test_hook_actor_from_request_async(app_client):
|
|
app_client.get("/")
|
|
# Should have no actor
|
|
assert None == app_client.ds._last_request.scope["actor"]
|
|
app_client.get("/?_bot2=1")
|
|
# Should have bot2 actor
|
|
assert {"id": "bot2", "1+1": 2} == app_client.ds._last_request.scope["actor"]
|
|
|
|
|
|
def test_existing_scope_actor_respected(app_client):
|
|
app_client.get("/?_actor_in_scope=1")
|
|
assert {"id": "from-scope"} == app_client.ds._last_request.scope["actor"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"action,expected",
|
|
[
|
|
("this_is_allowed", True),
|
|
("this_is_denied", False),
|
|
("this_is_allowed_async", True),
|
|
("this_is_denied_async", False),
|
|
("no_match", None),
|
|
],
|
|
)
|
|
async def test_hook_permission_allowed(app_client, action, expected):
|
|
actual = await app_client.ds.permission_allowed(
|
|
{"id": "actor"}, action, default=None
|
|
)
|
|
assert expected == actual
|
|
|
|
|
|
def test_actor_json(app_client):
|
|
assert {"actor": None} == app_client.get("/-/actor.json").json
|
|
assert {"actor": {"id": "bot2", "1+1": 2}} == app_client.get(
|
|
"/-/actor.json/?_bot2=1"
|
|
).json
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"path,body",
|
|
[
|
|
("/one/", "2"),
|
|
("/two/Ray?greeting=Hail", "Hail Ray"),
|
|
("/not-async/", "This was not async"),
|
|
],
|
|
)
|
|
def test_hook_register_routes(app_client, path, body):
|
|
response = app_client.get(path)
|
|
assert 200 == response.status
|
|
assert body == response.text
|
|
|
|
|
|
def test_hook_register_routes_post(app_client):
|
|
response = app_client.post("/post/", {"this is": "post data"}, csrftoken_from=True)
|
|
assert 200 == response.status
|
|
assert "csrftoken" in response.json
|
|
assert "post data" == response.json["this is"]
|
|
|
|
|
|
def test_hook_register_routes_csrftoken(restore_working_directory, tmpdir_factory):
|
|
templates = tmpdir_factory.mktemp("templates")
|
|
(templates / "csrftoken_form.html").write_text(
|
|
"CSRFTOKEN: {{ csrftoken() }}", "utf-8"
|
|
)
|
|
with make_app_client(template_dir=templates) as client:
|
|
response = client.get("/csrftoken-form/")
|
|
expected_token = client.ds._last_request.scope["csrftoken"]()
|
|
assert f"CSRFTOKEN: {expected_token}" == response.text
|
|
|
|
|
|
def test_hook_register_routes_asgi(app_client):
|
|
response = app_client.get("/three/")
|
|
assert {"hello": "world"} == response.json
|
|
assert "1" == response.headers["x-three"]
|
|
|
|
|
|
def test_hook_register_routes_add_message(app_client):
|
|
response = app_client.get("/add-message/")
|
|
assert 200 == response.status
|
|
assert "Added message" == response.text
|
|
decoded = app_client.ds.unsign(response.cookies["ds_messages"], "messages")
|
|
assert [["Hello from messages", 1]] == decoded
|
|
|
|
|
|
def test_hook_register_routes_render_message(restore_working_directory, tmpdir_factory):
|
|
templates = tmpdir_factory.mktemp("templates")
|
|
(templates / "render_message.html").write_text('{% extends "base.html" %}', "utf-8")
|
|
with make_app_client(template_dir=templates) as client:
|
|
response1 = client.get("/add-message/")
|
|
response2 = client.get("/render-message/", cookies=response1.cookies)
|
|
assert 200 == response2.status
|
|
assert "Hello from messages" in response2.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_hook_startup(app_client):
|
|
await app_client.ds.invoke_startup()
|
|
assert app_client.ds._startup_hook_fired
|
|
assert 2 == app_client.ds._startup_hook_calculation
|
|
|
|
|
|
def test_hook_canned_queries(app_client):
|
|
queries = app_client.get("/fixtures.json").json["queries"]
|
|
queries_by_name = {q["name"]: q for q in queries}
|
|
assert {
|
|
"sql": "select 2",
|
|
"name": "from_async_hook",
|
|
"private": False,
|
|
} == queries_by_name["from_async_hook"]
|
|
assert {
|
|
"sql": "select 1, 'null' as actor_id",
|
|
"name": "from_hook",
|
|
"private": False,
|
|
} == queries_by_name["from_hook"]
|
|
|
|
|
|
def test_hook_canned_queries_non_async(app_client):
|
|
response = app_client.get("/fixtures/from_hook.json?_shape=array")
|
|
assert [{"1": 1, "actor_id": "null"}] == response.json
|
|
|
|
|
|
def test_hook_canned_queries_async(app_client):
|
|
response = app_client.get("/fixtures/from_async_hook.json?_shape=array")
|
|
assert [{"2": 2}] == response.json
|
|
|
|
|
|
def test_hook_canned_queries_actor(app_client):
|
|
assert [{"1": 1, "actor_id": "bot"}] == app_client.get(
|
|
"/fixtures/from_hook.json?_bot=1&_shape=array"
|
|
).json
|
|
|
|
|
|
def test_hook_register_magic_parameters(restore_working_directory):
|
|
with make_app_client(
|
|
extra_databases={"data.db": "create table logs (line text)"},
|
|
metadata={
|
|
"databases": {
|
|
"data": {
|
|
"queries": {
|
|
"runme": {
|
|
"sql": "insert into logs (line) values (:_request_http_version)",
|
|
"write": True,
|
|
},
|
|
"get_uuid": {
|
|
"sql": "select :_uuid_new",
|
|
},
|
|
}
|
|
}
|
|
}
|
|
},
|
|
) as client:
|
|
response = client.post("/data/runme", {}, csrftoken_from=True)
|
|
assert 200 == response.status
|
|
actual = client.get("/data/logs.json?_sort_desc=rowid&_shape=array").json
|
|
assert [{"rowid": 1, "line": "1.1"}] == actual
|
|
# Now try the GET request against get_uuid
|
|
response_get = client.get("/data/get_uuid.json?_shape=array")
|
|
assert 200 == response_get.status
|
|
new_uuid = response_get.json[0][":_uuid_new"]
|
|
assert 4 == new_uuid.count("-")
|
|
|
|
|
|
def test_hook_forbidden(restore_working_directory):
|
|
with make_app_client(
|
|
extra_databases={"data2.db": "create table logs (line text)"},
|
|
metadata={"allow": {}},
|
|
) as client:
|
|
response = client.get("/")
|
|
assert 403 == response.status
|
|
response2 = client.get("/data2", allow_redirects=False)
|
|
assert 302 == response2.status
|
|
assert "/login?message=view-database" == response2.headers["Location"]
|
|
assert "view-database" == client.ds._last_forbidden_message
|
|
|
|
|
|
def test_hook_menu_links(app_client):
|
|
def get_menu_links(html):
|
|
soup = Soup(html, "html.parser")
|
|
return [
|
|
{"label": a.text, "href": a["href"]} for a in soup.find("nav").select("a")
|
|
]
|
|
|
|
response = app_client.get("/")
|
|
assert get_menu_links(response.text) == []
|
|
|
|
response_2 = app_client.get("/?_bot=1")
|
|
assert get_menu_links(response_2.text) == [
|
|
{"label": "Hello", "href": "/"},
|
|
{"label": "Hello 2", "href": "/"},
|
|
]
|
|
|
|
|
|
@pytest.mark.parametrize("table_or_view", ["facetable", "simple_view"])
|
|
def test_hook_table_actions(app_client, table_or_view):
|
|
def get_table_actions_links(html):
|
|
soup = Soup(html, "html.parser")
|
|
details = soup.find("details", {"class": "actions-menu-links"})
|
|
if details is None:
|
|
return []
|
|
return [{"label": a.text, "href": a["href"]} for a in details.select("a")]
|
|
|
|
response = app_client.get(f"/fixtures/{table_or_view}")
|
|
assert get_table_actions_links(response.text) == []
|
|
|
|
response_2 = app_client.get(f"/fixtures/{table_or_view}?_bot=1")
|
|
assert sorted(
|
|
get_table_actions_links(response_2.text), key=lambda l: l["label"]
|
|
) == [
|
|
{"label": "Database: fixtures", "href": "/"},
|
|
{"label": "From async", "href": "/"},
|
|
{"label": f"Table: {table_or_view}", "href": "/"},
|
|
]
|
|
|
|
|
|
def test_hook_database_actions(app_client):
|
|
def get_table_actions_links(html):
|
|
soup = Soup(html, "html.parser")
|
|
details = soup.find("details", {"class": "actions-menu-links"})
|
|
if details is None:
|
|
return []
|
|
return [{"label": a.text, "href": a["href"]} for a in details.select("a")]
|
|
|
|
response = app_client.get("/fixtures")
|
|
assert get_table_actions_links(response.text) == []
|
|
|
|
response_2 = app_client.get("/fixtures?_bot=1")
|
|
assert get_table_actions_links(response_2.text) == [
|
|
{"label": "Database: fixtures", "href": "/"},
|
|
]
|