mirror of
https://github.com/simonw/datasette.git
synced 2026-06-17 22:37:48 +02:00
79 lines
2.1 KiB
Python
79 lines
2.1 KiB
Python
import socket
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
|
|
import httpx
|
|
import pytest
|
|
|
|
from datasette.fixtures import write_fixture_database
|
|
|
|
|
|
def find_free_port():
|
|
with socket.socket() as sock:
|
|
sock.bind(("127.0.0.1", 0))
|
|
return sock.getsockname()[1]
|
|
|
|
|
|
def wait_for_server(process, url, timeout=10):
|
|
deadline = time.monotonic() + timeout
|
|
last_error = None
|
|
while time.monotonic() < deadline:
|
|
if process.poll() is not None:
|
|
stdout, stderr = process.communicate()
|
|
raise AssertionError(
|
|
"Datasette server exited early\n"
|
|
f"stdout:\n{stdout}\n"
|
|
f"stderr:\n{stderr}"
|
|
)
|
|
try:
|
|
response = httpx.get(url, timeout=1.0)
|
|
if response.status_code < 500:
|
|
return
|
|
last_error = f"HTTP {response.status_code}: {response.text[:200]}"
|
|
except httpx.HTTPError as ex:
|
|
last_error = repr(ex)
|
|
time.sleep(0.1)
|
|
raise AssertionError(f"Timed out waiting for {url}: {last_error}")
|
|
|
|
|
|
@pytest.fixture
|
|
def datasette_server(tmp_path):
|
|
db_path = tmp_path / "fixtures.db"
|
|
write_fixture_database(str(db_path))
|
|
port = find_free_port()
|
|
process = subprocess.Popen(
|
|
[
|
|
sys.executable,
|
|
"-m",
|
|
"datasette",
|
|
str(db_path),
|
|
"--host",
|
|
"127.0.0.1",
|
|
"--port",
|
|
str(port),
|
|
"--setting",
|
|
"num_sql_threads",
|
|
"1",
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
url = f"http://127.0.0.1:{port}/"
|
|
try:
|
|
wait_for_server(process, url)
|
|
yield url
|
|
finally:
|
|
process.terminate()
|
|
try:
|
|
process.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
process.kill()
|
|
process.wait()
|
|
|
|
|
|
@pytest.mark.playwright
|
|
def test_datasette_homepage_contains_datasette(page, datasette_server):
|
|
page.goto(datasette_server)
|
|
assert "Datasette" in page.locator("body").inner_text()
|