/db/table/-/autocomplete?q= JSON endpoint

Needed to help implement edit foreign key reference.
This commit is contained in:
Simon Willison 2026-06-13 22:40:03 -07:00
commit b868f7d4c3
4 changed files with 385 additions and 0 deletions

View file

@ -82,6 +82,7 @@ from .views.special import (
TableSchemaView,
)
from .views.table import (
TableAutocompleteView,
TableInsertView,
TableUpsertView,
TableSetColumnTypeView,
@ -2619,6 +2620,10 @@ class Datasette:
TableFragmentView.as_view(self),
r"/(?P<database>[^\/\.]+)/(?P<table>[^\/\.]+)/-/fragment$",
)
add_route(
TableAutocompleteView.as_view(self),
r"/(?P<database>[^\/\.]+)/(?P<table>[^\/\.]+)/-/autocomplete$",
)
add_route(
TableDropView.as_view(self),
r"/(?P<database>[^\/\.]+)/(?P<table>[^\/\.]+)/-/drop$",

View file

@ -15,6 +15,7 @@ from datasette.events import (
InsertRowsEvent,
UpsertRowsEvent,
)
from datasette.database import QueryInterrupted
from datasette import tracer
from datasette.resources import DatabaseResource, TableResource
from datasette.utils import (
@ -1111,6 +1112,155 @@ class TableFragmentView(BaseView):
return Response.html(html)
def _escape_like(value):
return value.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
# Returns the exclusive upper bound for an indexed prefix search:
# "abc" -> "abd", so `pk >= "abc" and pk < "abd"` covers "abc%".
# The LIKE clause is still applied separately for exact escaped-LIKE semantics.
def _prefix_range_end(value):
if not value:
return None
characters = list(value)
for i in range(len(characters) - 1, -1, -1):
if ord(characters[i]) < 0x10FFFF:
return "{}{}".format("".join(characters[:i]), chr(ord(characters[i]) + 1))
return None
def _autocomplete_like(column):
return "{} like :like escape char(92)".format(escape_sqlite(column))
def _autocomplete_prefix_like(column):
return "{} like :prefix escape char(92)".format(escape_sqlite(column))
def _autocomplete_order_by(pks, label_column, exact_pk, label_matches_first=True):
clauses = []
if exact_pk:
clauses.append(
"case when cast({} as text) = :q then 0 else 1 end".format(
escape_sqlite(pks[0])
)
)
if label_column:
label_like = _autocomplete_like(label_column)
if label_matches_first:
clauses.append("case when {} then 0 else 1 end".format(label_like))
clauses.append(
"case when {} then length(cast({} as text)) end".format(
label_like, escape_sqlite(label_column)
)
)
else:
clauses.append("length(cast({} as text))".format(escape_sqlite(pks[0])))
clauses.extend(escape_sqlite(pk) for pk in pks)
return ", ".join(clauses)
def _autocomplete_pk_order_by(pks):
return ", ".join(escape_sqlite(pk) for pk in pks)
def _autocomplete_response_rows(rows, pks, label_column):
response_rows = []
for row in rows:
item = {"pks": {pk: row[pk] for pk in pks}}
if label_column:
item["label"] = row[label_column]
response_rows.append(item)
return response_rows
class TableAutocompleteView(BaseView):
name = "table-autocomplete"
async def get(self, request):
resolved = await self.ds.resolve_table(request)
if resolved.is_view:
raise BadRequest("Autocomplete is only available for tables")
db = resolved.db
database_name = db.name
table_name = resolved.table
visible, _ = await self.ds.check_visibility(
request.actor,
action="view-table",
resource=TableResource(database=database_name, table=table_name),
)
if not visible:
raise Forbidden("You do not have permission to view this table")
pks = await db.primary_keys(table_name)
if not pks:
pks = ["rowid"]
label_column = await db.label_column_for_table(table_name)
select_columns = list(
dict.fromkeys(pks + ([label_column] if label_column else []))
)
select_sql = ", ".join(escape_sqlite(column) for column in select_columns)
q = request.args.get("q") or ""
if not q:
return Response.json({"rows": []})
params = {
"q": q,
"like": "%{}%".format(_escape_like(q)),
"prefix": "{}%".format(_escape_like(q)),
}
like_columns = pks[:]
if label_column and label_column not in like_columns:
like_columns.append(label_column)
where_sql = " or ".join(_autocomplete_like(column) for column in like_columns)
exact_pk = len(pks) == 1
sql = """
select {select_sql}
from {table}
where {where}
order by {order_by}
limit 10
""".format(
select_sql=select_sql,
table=escape_sqlite(table_name),
where=where_sql,
order_by=_autocomplete_order_by(pks, label_column, exact_pk),
)
try:
results = await db.execute(sql, params, custom_time_limit=500)
except QueryInterrupted:
fallback_where = _autocomplete_prefix_like(pks[0])
prefix_end = _prefix_range_end(q)
if prefix_end:
params["prefix_end"] = prefix_end
first_pk = escape_sqlite(pks[0])
fallback_where = (
"{first_pk} >= :q and {first_pk} < :prefix_end and {like}"
).format(first_pk=first_pk, like=fallback_where)
fallback_sql = """
select {select_sql}
from {table}
where {where}
order by {order_by}
limit 10
""".format(
select_sql=select_sql,
table=escape_sqlite(table_name),
where=fallback_where,
order_by=_autocomplete_pk_order_by(pks),
)
try:
results = await db.execute(fallback_sql, params, custom_time_limit=500)
except QueryInterrupted:
return Response.json({"rows": []})
return Response.json(
{"rows": _autocomplete_response_rows(results.rows, pks, label_column)}
)
async def _columns_to_select(table_columns, pks, request):
columns = list(table_columns)
if "_col" in request.args:

View file

@ -1201,6 +1201,48 @@ The following extras are available for arbitrary SQL query responses and stored,
.. [[[end]]]
.. _TableAutocompleteView:
Table autocomplete
------------------
The ``/<database>/<table>/-/autocomplete`` endpoint returns up to 10 primary key
matches for a table, intended for building autocomplete interfaces such as
foreign key pickers.
::
GET /<database>/<table>/-/autocomplete?q=search
The ``q`` parameter is required. If it is omitted or blank, the endpoint returns
an empty ``"rows"`` list.
The response includes a ``"pks"`` object containing the primary key value or
values for each row. If Datasette can detect a label column, or one has been
configured using ``label_column``, each row will also include ``"label"``:
.. code-block:: json
{
"rows": [
{
"pks": {
"id": 1
},
"label": "Example row"
}
]
}
The endpoint searches the primary key column or columns and the label column
using escaped SQL ``LIKE`` queries. A single-column primary key exact match is
returned first. Other matches are ordered by the shortest matching label value
where a label column is available.
The initial search runs with a 500ms time limit. If that query times out,
Datasette falls back to a prefix match against the first primary key column so
SQLite can use the primary key index.
.. _table_arguments:
Table arguments

188
tests/test_autocomplete.py Normal file
View file

@ -0,0 +1,188 @@
import pytest
from datasette.app import Datasette
@pytest.mark.asyncio
async def test_autocomplete_single_pk_exact_match_and_label_order():
ds = Datasette(memory=True)
db = ds.add_memory_database("autocomplete_single")
await db.execute_write_script("""
create table people (
id integer primary key,
name text
);
insert into people (id, name) values
(2, 'Longer non-label pk match'),
(20, '2'),
(21, '22'),
(200, 'A'),
(3, 'A label containing 2');
""")
response = await ds.client.get("/autocomplete_single/people/-/autocomplete?q=2")
assert response.status_code == 200
assert response.json() == {
"rows": [
{"pks": {"id": 2}, "label": "Longer non-label pk match"},
{"pks": {"id": 20}, "label": "2"},
{"pks": {"id": 21}, "label": "22"},
{"pks": {"id": 3}, "label": "A label containing 2"},
{"pks": {"id": 200}, "label": "A"},
]
}
@pytest.mark.asyncio
async def test_autocomplete_blank_q_returns_no_results():
ds = Datasette(memory=True)
db = ds.add_memory_database("autocomplete_blank")
await db.execute_write_script("""
create table people (
id integer primary key,
name text
);
insert into people (id, name) values
(1, 'Alice'),
(2, 'Bob');
""")
response = await ds.client.get("/autocomplete_blank/people/-/autocomplete?q=")
assert response.status_code == 200
assert response.json() == {"rows": []}
@pytest.mark.asyncio
async def test_autocomplete_escapes_like_characters():
ds = Datasette(memory=True)
db = ds.add_memory_database("autocomplete_escape")
await db.execute_write_script("""
create table tags (
id integer primary key,
name text
);
insert into tags (id, name) values
(1, '100% real'),
(2, '100X real'),
(3, '100 percent real');
""")
response = await ds.client.get("/autocomplete_escape/tags/-/autocomplete?q=100%25")
assert response.status_code == 200
assert response.json() == {
"rows": [
{"pks": {"id": 1}, "label": "100% real"},
]
}
@pytest.mark.asyncio
async def test_autocomplete_compound_pk_searches_all_pk_columns():
ds = Datasette(memory=True)
db = ds.add_memory_database("autocomplete_compound")
await db.execute_write_script("""
create table places (
country text,
code text,
name text,
primary key (country, code)
);
insert into places (country, code, name) values
('us', 'ca', 'California'),
('ca', 'bc', 'British Columbia'),
('mx', 'ca', 'Campeche'),
('zz', 'zz', 'Nothing');
""")
response = await ds.client.get("/autocomplete_compound/places/-/autocomplete?q=ca")
assert response.status_code == 200
assert response.json() == {
"rows": [
{"pks": {"country": "mx", "code": "ca"}, "label": "Campeche"},
{"pks": {"country": "us", "code": "ca"}, "label": "California"},
{"pks": {"country": "ca", "code": "bc"}, "label": "British Columbia"},
]
}
@pytest.mark.asyncio
async def test_autocomplete_primary_key_called_label():
ds = Datasette(
memory=True,
config={
"databases": {
"autocomplete_label_pk": {
"tables": {"things": {"label_column": "name"}}
}
}
},
)
db = ds.add_memory_database("autocomplete_label_pk")
await db.execute_write_script("""
create table things (
label text primary key,
name text
);
insert into things (label, name) values
('abc', 'Display value'),
('def', 'Other value');
""")
response = await ds.client.get("/autocomplete_label_pk/things/-/autocomplete?q=abc")
assert response.status_code == 200
assert response.json() == {
"rows": [
{"pks": {"label": "abc"}, "label": "Display value"},
]
}
@pytest.mark.asyncio
async def test_autocomplete_timeout_uses_prefix_fallback():
ds = Datasette(
memory=True,
config={
"databases": {
"autocomplete_timeout": {"tables": {"things": {"label_column": "name"}}}
}
},
settings={
"num_sql_threads": 1,
"sql_time_limit_ms": 1,
},
)
db = ds.add_memory_database("autocomplete_timeout")
await db.execute_write_script("""
create table things (
id text primary key,
name text
);
insert into things (id, name) values
('other-000001', 'item-1999 label-only match');
""")
def insert_rows(conn):
conn.executemany(
"insert into things (id, name) values (?, ?)",
((f"item-{i:06d}", f"name {i:06d}") for i in range(200_000)),
)
await db.execute_write_fn(insert_rows)
response = await ds.client.get(
"/autocomplete_timeout/things/-/autocomplete?q=item-1999"
)
assert response.status_code == 200
data = response.json()
assert data == {
"rows": [
{"pks": {"id": f"item-1999{i:02d}"}, "label": f"name 1999{i:02d}"}
for i in range(10)
]
}