mirror of
https://github.com/simonw/datasette.git
synced 2026-06-04 16:16:59 +02:00
305 lines
9.2 KiB
Python
305 lines
9.2 KiB
Python
import httpx
|
|
import os
|
|
import pathlib
|
|
import pytest
|
|
import pytest_asyncio
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from dataclasses import dataclass
|
|
from datasette import Event, hookimpl
|
|
|
|
|
|
try:
|
|
import pysqlite3 as sqlite3
|
|
except ImportError:
|
|
import sqlite3
|
|
|
|
UNDOCUMENTED_PERMISSIONS = {
|
|
"this_is_allowed",
|
|
"this_is_denied",
|
|
"this_is_allowed_async",
|
|
"this_is_denied_async",
|
|
"no_match",
|
|
# Test actions from test_hook_register_actions_with_custom_resources
|
|
"manage_documents",
|
|
"view_document_collection",
|
|
"view_document",
|
|
}
|
|
|
|
_ds_client = None
|
|
_ds_instance = None
|
|
|
|
|
|
def wait_until_responds(url, timeout=5.0, client=httpx, **kwargs):
|
|
start = time.time()
|
|
while time.time() - start < timeout:
|
|
try:
|
|
client.get(url, **kwargs)
|
|
return
|
|
except httpx.ConnectError:
|
|
time.sleep(0.1)
|
|
raise AssertionError("Timed out waiting for {} to respond".format(url))
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def ds_client():
|
|
from datasette.app import Datasette
|
|
from datasette.database import Database
|
|
from .fixtures import CONFIG, METADATA, PLUGINS_DIR
|
|
import secrets
|
|
|
|
global _ds_client, _ds_instance
|
|
if _ds_client is not None:
|
|
return _ds_client
|
|
|
|
ds = Datasette(
|
|
metadata=METADATA,
|
|
config=CONFIG,
|
|
plugins_dir=PLUGINS_DIR,
|
|
settings={
|
|
"default_page_size": 50,
|
|
"max_returned_rows": 100,
|
|
"sql_time_limit_ms": 200,
|
|
"facet_suggest_time_limit_ms": 200, # Up from 50 default
|
|
# Default is 3 but this results in "too many open files"
|
|
# errors when running the full test suite:
|
|
"num_sql_threads": 1,
|
|
},
|
|
)
|
|
from .fixtures import TABLES, TABLE_PARAMETERIZED_SQL
|
|
|
|
# Use a unique memory_name to avoid collisions between different
|
|
# Datasette instances in the same process, but use "fixtures" for routing
|
|
unique_memory_name = f"fixtures_{secrets.token_hex(8)}"
|
|
db = ds.add_database(Database(ds, memory_name=unique_memory_name), name="fixtures")
|
|
ds.remove_database("_memory")
|
|
|
|
def prepare(conn):
|
|
if not conn.execute("select count(*) from sqlite_master").fetchone()[0]:
|
|
conn.executescript(TABLES)
|
|
for sql, params in TABLE_PARAMETERIZED_SQL:
|
|
with conn:
|
|
conn.execute(sql, params)
|
|
|
|
await db.execute_write_fn(prepare)
|
|
await ds.invoke_startup()
|
|
_ds_client = ds.client
|
|
_ds_instance = ds
|
|
return _ds_client
|
|
|
|
|
|
def pytest_report_header(config):
|
|
conn = sqlite3.connect(":memory:")
|
|
version = conn.execute("select sqlite_version()").fetchone()[0]
|
|
conn.close()
|
|
return "SQLite: {}".format(version)
|
|
|
|
|
|
def pytest_configure(config):
|
|
import sys
|
|
|
|
sys._called_from_test = True
|
|
|
|
|
|
def pytest_unconfigure(config):
|
|
import sys
|
|
|
|
del sys._called_from_test
|
|
|
|
# Clean up the global ds_client fixture
|
|
global _ds_instance
|
|
if _ds_instance is not None:
|
|
# Close databases first (while executor is still running)
|
|
for db in _ds_instance.databases.values():
|
|
db.close()
|
|
if hasattr(_ds_instance, "_internal_database"):
|
|
_ds_instance._internal_database.close()
|
|
# Then shut down executor
|
|
if _ds_instance.executor is not None:
|
|
_ds_instance.executor.shutdown(wait=True)
|
|
_ds_instance = None
|
|
|
|
|
|
def pytest_collection_modifyitems(items):
|
|
# Ensure test_cli.py and test_black.py and test_inspect.py run first before any asyncio code kicks in
|
|
move_to_front(items, "test_cli")
|
|
move_to_front(items, "test_black")
|
|
move_to_front(items, "test_inspect_cli")
|
|
move_to_front(items, "test_serve_with_get")
|
|
move_to_front(items, "test_serve_with_get_exit_code_for_error")
|
|
move_to_front(items, "test_inspect_cli_writes_to_file")
|
|
move_to_front(items, "test_spatialite_error_if_attempt_to_open_spatialite")
|
|
move_to_front(items, "test_package")
|
|
move_to_front(items, "test_package_with_port")
|
|
|
|
|
|
def move_to_front(items, test_name):
|
|
test = [fn for fn in items if fn.name == test_name]
|
|
if test:
|
|
items.insert(0, items.pop(items.index(test[0])))
|
|
|
|
|
|
@pytest.fixture
|
|
def restore_working_directory(tmpdir, request):
|
|
try:
|
|
previous_cwd = os.getcwd()
|
|
except OSError:
|
|
# https://github.com/simonw/datasette/issues/1361
|
|
previous_cwd = None
|
|
tmpdir.chdir()
|
|
|
|
def return_to_previous():
|
|
os.chdir(previous_cwd)
|
|
|
|
if previous_cwd is not None:
|
|
request.addfinalizer(return_to_previous)
|
|
|
|
|
|
@pytest.fixture(scope="session", autouse=True)
|
|
def check_actions_are_documented():
|
|
from datasette.plugins import pm
|
|
|
|
content = (
|
|
pathlib.Path(__file__).parent.parent / "docs" / "authentication.rst"
|
|
).read_text()
|
|
permissions_re = re.compile(r"\.\. _actions_([^\s:]+):")
|
|
documented_actions = set(permissions_re.findall(content)).union(
|
|
UNDOCUMENTED_PERMISSIONS
|
|
)
|
|
|
|
def before(hook_name, hook_impls, kwargs):
|
|
if hook_name == "permission_resources_sql":
|
|
datasette = kwargs["datasette"]
|
|
assert kwargs["action"] in datasette.actions, (
|
|
"'{}' has not been registered with register_actions()".format(
|
|
kwargs["action"]
|
|
)
|
|
+ " (or maybe a test forgot to do await ds.invoke_startup())"
|
|
)
|
|
action = kwargs.get("action").replace("-", "_")
|
|
assert (
|
|
action in documented_actions
|
|
), "Undocumented permission action: {}".format(action)
|
|
|
|
pm.add_hookcall_monitoring(
|
|
before=before, after=lambda outcome, hook_name, hook_impls, kwargs: None
|
|
)
|
|
|
|
|
|
class TrackEventPlugin:
|
|
__name__ = "TrackEventPlugin"
|
|
|
|
@dataclass
|
|
class OneEvent(Event):
|
|
name = "one"
|
|
|
|
extra: str
|
|
|
|
@hookimpl
|
|
def register_events(self, datasette):
|
|
async def inner():
|
|
return [self.OneEvent]
|
|
|
|
return inner
|
|
|
|
@hookimpl
|
|
def track_event(self, datasette, event):
|
|
datasette._tracked_events = getattr(datasette, "_tracked_events", [])
|
|
datasette._tracked_events.append(event)
|
|
|
|
|
|
@pytest.fixture(scope="session", autouse=True)
|
|
def install_event_tracking_plugin():
|
|
from datasette.plugins import pm
|
|
|
|
pm.register(TrackEventPlugin(), name="TrackEventPlugin")
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def ds_localhost_http_server():
|
|
ds_proc = subprocess.Popen(
|
|
[sys.executable, "-m", "datasette", "--memory", "-p", "8041"],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
# Avoid FileNotFoundError: [Errno 2] No such file or directory:
|
|
cwd=tempfile.gettempdir(),
|
|
)
|
|
wait_until_responds("http://localhost:8041/")
|
|
# Check it started successfully
|
|
assert not ds_proc.poll(), ds_proc.stdout.read().decode("utf-8")
|
|
yield ds_proc
|
|
# Shut it down at the end of the pytest session
|
|
ds_proc.terminate()
|
|
ds_proc.wait()
|
|
if ds_proc.stdout:
|
|
ds_proc.stdout.close()
|
|
|
|
|
|
def wait_until_uds_responds(uds_path, timeout=5.0):
|
|
"""Wait for a Unix domain socket to accept connections."""
|
|
import socket as socket_module
|
|
|
|
start = time.time()
|
|
while time.time() - start < timeout:
|
|
sock = socket_module.socket(socket_module.AF_UNIX, socket_module.SOCK_STREAM)
|
|
try:
|
|
sock.connect(uds_path)
|
|
# Connection successful, now close and return
|
|
sock.close()
|
|
return
|
|
except (ConnectionRefusedError, FileNotFoundError):
|
|
sock.close()
|
|
time.sleep(0.1)
|
|
raise AssertionError("Timed out waiting for {} to respond".format(uds_path))
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def ds_unix_domain_socket_server(tmp_path_factory):
|
|
# This used to use tmp_path_factory.mktemp("uds") but that turned out to
|
|
# produce paths that were too long to use as UDS on macOS, see
|
|
# https://github.com/simonw/datasette/issues/1407 - so I switched to
|
|
# using tempfile.gettempdir()
|
|
uds = str(pathlib.Path(tempfile.gettempdir()) / "datasette.sock")
|
|
ds_proc = subprocess.Popen(
|
|
[sys.executable, "-m", "datasette", "--memory", "--uds", uds],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
cwd=tempfile.gettempdir(),
|
|
)
|
|
# Poll until available using raw socket to avoid httpx connection pool leaks
|
|
wait_until_uds_responds(uds)
|
|
# Check it started successfully
|
|
assert not ds_proc.poll(), ds_proc.stdout.read().decode("utf-8")
|
|
yield ds_proc, uds
|
|
# Shut it down at the end of the pytest session
|
|
ds_proc.terminate()
|
|
ds_proc.wait()
|
|
if ds_proc.stdout:
|
|
ds_proc.stdout.close()
|
|
|
|
|
|
# Import fixtures from fixtures.py to make them available
|
|
from .fixtures import ( # noqa: E402, F401
|
|
app_client,
|
|
app_client_base_url_prefix,
|
|
app_client_conflicting_database_names,
|
|
app_client_csv_max_mb_one,
|
|
app_client_immutable_and_inspect_file,
|
|
app_client_larger_cache_size,
|
|
app_client_no_files,
|
|
app_client_returned_rows_matches_page_size,
|
|
app_client_shorter_time_limit,
|
|
app_client_two_attached_databases,
|
|
app_client_two_attached_databases_crossdb_enabled,
|
|
app_client_two_attached_databases_one_immutable,
|
|
app_client_with_cors,
|
|
app_client_with_dot,
|
|
app_client_with_trace,
|
|
generate_compound_rows,
|
|
generate_sortable_rows,
|
|
make_app_client,
|
|
TEMP_PLUGIN_SECRET_FILE,
|
|
)
|