from datasette.app import Datasette
from datasette.fixtures import (
EXTRA_DATABASE_SQL,
write_extra_database,
write_fixture_database,
)
from datasette.utils.testing import TestClient
import click
import contextlib
import json
import os
import pathlib
import pytest
import tempfile
import textwrap
# This temp file is used by one of the plugin config tests
TEMP_PLUGIN_SECRET_FILE = os.path.join(tempfile.gettempdir(), "plugin-secret")
PLUGINS_DIR = str(pathlib.Path(__file__).parent / "plugins")
EXPECTED_PLUGINS = [
{
"name": "messages_output_renderer.py",
"static": False,
"templates": False,
"version": None,
"hooks": ["register_output_renderer"],
},
{
"name": "my_plugin.py",
"static": False,
"templates": False,
"version": None,
"hooks": [
"actor_from_request",
"asgi_wrapper",
"database_actions",
"extra_body_script",
"extra_css_urls",
"extra_js_urls",
"extra_template_vars",
"forbidden",
"homepage_actions",
"menu_links",
"permission_resources_sql",
"prepare_connection",
"prepare_jinja2_environment",
"query_actions",
"register_actions",
"register_facet_classes",
"register_magic_parameters",
"register_routes",
"register_token_handler",
"render_cell",
"row_actions",
"startup",
"table_actions",
"view_actions",
],
},
{
"name": "my_plugin_2.py",
"static": False,
"templates": False,
"version": None,
"hooks": [
"actor_from_request",
"asgi_wrapper",
"extra_js_urls",
"extra_template_vars",
"handle_exception",
"menu_links",
"prepare_jinja2_environment",
"register_routes",
"render_cell",
"startup",
"table_actions",
],
},
{
"name": "register_output_renderer.py",
"static": False,
"templates": False,
"version": None,
"hooks": ["register_output_renderer"],
},
{
"name": "sleep_sql_function.py",
"static": False,
"templates": False,
"version": None,
"hooks": ["prepare_connection"],
},
{
"name": "view_name.py",
"static": False,
"templates": False,
"version": None,
"hooks": ["extra_template_vars"],
},
]
@contextlib.contextmanager
def make_app_client(
sql_time_limit_ms=None,
max_returned_rows=None,
cors=False,
memory=False,
settings=None,
filename="fixtures.db",
is_immutable=False,
extra_databases=None,
inspect_data=None,
static_mounts=None,
template_dir=None,
config=None,
metadata=None,
crossdb=False,
):
with tempfile.TemporaryDirectory() as tmpdir:
filepath = os.path.join(tmpdir, filename)
if is_immutable:
files = []
immutables = [filepath]
else:
files = [filepath]
immutables = []
write_fixture_database(filepath)
if extra_databases is not None:
for extra_filename, extra_sql in extra_databases.items():
extra_filepath = os.path.join(tmpdir, extra_filename)
if extra_sql == EXTRA_DATABASE_SQL:
write_extra_database(extra_filepath)
else:
from datasette.utils.sqlite import sqlite3
c2 = sqlite3.connect(extra_filepath)
c2.executescript(extra_sql)
c2.close()
# Insert at start to help test /-/databases ordering:
files.insert(0, extra_filepath)
os.chdir(os.path.dirname(filepath))
settings = settings or {}
for key, value in {
"default_page_size": 50,
"max_returned_rows": max_returned_rows or 100,
"sql_time_limit_ms": sql_time_limit_ms or 200,
# Default is 3 but this results in "too many open files"
# errors when running the full test suite:
"num_sql_threads": 1,
}.items():
if key not in settings:
settings[key] = value
ds = Datasette(
files,
immutables=immutables,
memory=memory,
cors=cors,
metadata=metadata or METADATA,
config=config or CONFIG,
plugins_dir=PLUGINS_DIR,
settings=settings,
inspect_data=inspect_data,
static_mounts=static_mounts,
template_dir=template_dir,
crossdb=crossdb,
)
yield TestClient(ds)
# Close as many database connections as possible
# to try and avoid too many open files error
for db in ds.databases.values():
if not db.is_memory:
db.close()
@pytest.fixture(scope="session")
def app_client():
with make_app_client() as client:
yield client
@pytest.fixture(scope="session")
def app_client_no_files():
ds = Datasette([])
yield TestClient(ds)
for db in ds.databases.values():
db.close()
@pytest.fixture(scope="session")
def app_client_base_url_prefix():
with make_app_client(settings={"base_url": "/prefix/"}) as client:
yield client
@pytest.fixture(scope="session")
def app_client_two_attached_databases():
with make_app_client(
extra_databases={"extra database.db": EXTRA_DATABASE_SQL}
) as client:
yield client
@pytest.fixture(scope="session")
def app_client_two_attached_databases_crossdb_enabled():
with make_app_client(
extra_databases={"extra database.db": EXTRA_DATABASE_SQL},
crossdb=True,
) as client:
yield client
@pytest.fixture(scope="session")
def app_client_conflicting_database_names():
with make_app_client(
extra_databases={"foo.db": EXTRA_DATABASE_SQL, "foo-bar.db": EXTRA_DATABASE_SQL}
) as client:
yield client
@pytest.fixture(scope="session")
def app_client_two_attached_databases_one_immutable():
with make_app_client(
is_immutable=True, extra_databases={"extra database.db": EXTRA_DATABASE_SQL}
) as client:
yield client
@pytest.fixture(scope="session")
def app_client_with_trace():
with make_app_client(settings={"trace_debug": True}, is_immutable=True) as client:
yield client
@pytest.fixture(scope="session")
def app_client_shorter_time_limit():
with make_app_client(20) as client:
yield client
@pytest.fixture(scope="session")
def app_client_returned_rows_matches_page_size():
with make_app_client(max_returned_rows=50) as client:
yield client
@pytest.fixture(scope="session")
def app_client_larger_cache_size():
with make_app_client(settings={"cache_size_kb": 2500}) as client:
yield client
@pytest.fixture(scope="session")
def app_client_csv_max_mb_one():
with make_app_client(settings={"max_csv_mb": 1}) as client:
yield client
@pytest.fixture(scope="session")
def app_client_with_dot():
with make_app_client(filename="fixtures.dot.db") as client:
yield client
@pytest.fixture(scope="session")
def app_client_with_cors():
with make_app_client(is_immutable=True, cors=True) as client:
yield client
@pytest.fixture(scope="session")
def app_client_immutable_and_inspect_file():
inspect_data = {"fixtures": {"tables": {"sortable": {"count": 100}}}}
with make_app_client(is_immutable=True, inspect_data=inspect_data) as client:
yield client
CONFIG = {
"plugins": {
"name-of-plugin": {"depth": "root"},
"env-plugin": {"foo": {"$env": "FOO_ENV"}},
"env-plugin-list": [{"in_a_list": {"$env": "FOO_ENV"}}],
"file-plugin": {"foo": {"$file": TEMP_PLUGIN_SECRET_FILE}},
},
"databases": {
"fixtures": {
"plugins": {"name-of-plugin": {"depth": "database"}},
"tables": {
"simple_primary_key": {
"plugins": {
"name-of-plugin": {
"depth": "table",
"special": "this-is-simple_primary_key",
}
},
},
"sortable": {
"plugins": {"name-of-plugin": {"depth": "table"}},
},
},
"queries": {
"𝐜𝐢𝐭𝐢𝐞𝐬": "select id, name from facet_cities order by id limit 1;",
"pragma_cache_size": "PRAGMA cache_size;",
"magic_parameters": {
"sql": "select :_header_user_agent as user_agent, :_now_datetime_utc as datetime",
},
"neighborhood_search": {
"sql": textwrap.dedent("""
select _neighborhood, facet_cities.name, state
from facetable
join facet_cities
on facetable._city_id = facet_cities.id
where _neighborhood like '%' || :text || '%'
order by _neighborhood;
"""),
"title": "Search neighborhoods",
"description_html": "Demonstrating simple like search",
"fragment": "fragment-goes-here",
"hide_sql": True,
},
},
}
},
"extra_css_urls": ["/static/extra-css-urls.css"],
}
METADATA = {
"title": "Datasette Fixtures",
"description_html": 'An example SQLite database demonstrating Datasette. Sign in as root user',
"license": "Apache License 2.0",
"license_url": "https://github.com/simonw/datasette/blob/main/LICENSE",
"source": "tests/fixtures.py",
"source_url": "https://github.com/simonw/datasette/blob/main/tests/fixtures.py",
"about": "About Datasette",
"about_url": "https://github.com/simonw/datasette",
"databases": {
"fixtures": {
"description": "Test tables description",
"tables": {
"simple_primary_key": {
"description_html": "Simple primary key",
"title": "This HTML is escaped",
},
"sortable": {
"sortable_columns": [
"sortable",
"sortable_with_nulls",
"sortable_with_nulls_2",
"text",
],
},
"no_primary_key": {"sortable_columns": [], "hidden": True},
"primary_key_multiple_columns_explicit_label": {
"label_column": "content2"
},
"simple_view": {"sortable_columns": ["content"]},
"searchable_view_configured_by_metadata": {
"fts_table": "searchable_fts",
"fts_pk": "pk",
},
"roadside_attractions": {
"columns": {
"name": "The name of the attraction",
"address": "The street address for the attraction",
}
},
"attraction_characteristic": {"sort_desc": "pk"},
"facet_cities": {"sort": "name"},
"paginated_view": {"size": 25},
},
}
},
}
def assert_permissions_checked(datasette, actions):
# actions is a list of "action" or (action, resource) tuples
for action in actions:
if isinstance(action, str):
resource = None
else:
action, resource = action
# Convert PermissionCheck dataclass to old resource format for comparison
def check_matches(pc, action, resource):
if pc.action != action:
return False
# Convert parent/child to old resource format
if pc.parent and pc.child:
pc_resource = (pc.parent, pc.child)
elif pc.parent:
pc_resource = pc.parent
else:
pc_resource = None
return pc_resource == resource
assert [
pc
for pc in datasette._permission_checks
if check_matches(pc, action, resource)
], """Missing expected permission check: action={}, resource={}
Permission checks seen: {}
""".format(
action,
resource,
json.dumps(
[
{
"action": pc.action,
"parent": pc.parent,
"child": pc.child,
"result": pc.result,
}
for pc in datasette._permission_checks
],
indent=4,
),
)
@click.command()
@click.argument(
"db_filename",
default="fixtures.db",
type=click.Path(file_okay=True, dir_okay=False),
)
@click.argument("config", required=False)
@click.argument("metadata", required=False)
@click.argument(
"plugins_path", type=click.Path(file_okay=False, dir_okay=True), required=False
)
@click.option(
"--recreate",
is_flag=True,
default=False,
help="Delete and recreate database if it exists",
)
@click.option(
"--extra-db-filename",
type=click.Path(file_okay=True, dir_okay=False),
help="Write out second test DB to this file",
)
def cli(db_filename, config, metadata, plugins_path, recreate, extra_db_filename):
"""Write out the fixtures database used by Datasette's test suite"""
if metadata and not metadata.endswith(".json"):
raise click.ClickException("Metadata should end with .json")
if not db_filename.endswith(".db"):
raise click.ClickException("Database file should end with .db")
if pathlib.Path(db_filename).exists():
if not recreate:
raise click.ClickException(
f"{db_filename} already exists, use --recreate to reset it"
)
else:
pathlib.Path(db_filename).unlink()
write_fixture_database(db_filename)
print(f"Test tables written to {db_filename}")
if metadata:
with open(metadata, "w") as fp:
fp.write(json.dumps(METADATA, indent=4))
print(f"- metadata written to {metadata}")
if config:
with open(config, "w") as fp:
fp.write(json.dumps(CONFIG, indent=4))
print(f"- config written to {config}")
if plugins_path:
path = pathlib.Path(plugins_path)
if not path.exists():
path.mkdir()
test_plugins = pathlib.Path(__file__).parent / "plugins"
for filepath in test_plugins.glob("*.py"):
newpath = path / filepath.name
newpath.write_text(filepath.read_text())
print(f" Wrote plugin: {newpath}")
if extra_db_filename:
if pathlib.Path(extra_db_filename).exists():
if not recreate:
raise click.ClickException(
f"{extra_db_filename} already exists, use --recreate to reset it"
)
else:
pathlib.Path(extra_db_filename).unlink()
write_extra_database(extra_db_filename)
print(f"Test tables written to {extra_db_filename}")
if __name__ == "__main__":
cli()