/db/table/-/foreign-key-suggestions API

Improved version of the implementation datasette-edit-schema
This commit is contained in:
Simon Willison 2026-06-17 14:47:25 -07:00
commit 2900efb32d
4 changed files with 469 additions and 2 deletions

View file

@ -49,7 +49,11 @@ from .views.database import (
DatabaseView,
QueryView,
)
from .views.table_create_alter import TableAlterView, TableCreateView
from .views.table_create_alter import (
TableAlterView,
TableCreateView,
TableForeignKeySuggestionsView,
)
from .views.execute_write import ExecuteWriteAnalyzeView, ExecuteWriteView
from .views.stored_queries import (
QueryCreateAnalyzeView,
@ -2630,6 +2634,10 @@ class Datasette:
TableAlterView.as_view(self),
r"/(?P<database>[^\/\.]+)/(?P<table>[^\/\.]+)/-/alter$",
)
add_route(
TableForeignKeySuggestionsView.as_view(self),
r"/(?P<database>[^\/\.]+)/(?P<table>[^\/\.]+)/-/foreign-key-suggestions$",
)
add_route(
TableSetColumnTypeView.as_view(self),
r"/(?P<database>[^\/\.]+)/(?P<table>[^\/\.]+)/-/set-column-type$",

View file

@ -1,7 +1,9 @@
import json
import re
import time
from typing import Annotated, Any, Literal, Union
from datasette.database import QueryInterrupted
from pydantic import (
BaseModel,
ConfigDict,
@ -17,8 +19,14 @@ from sqlite_utils.db import DEFAULT as SQLITE_UTILS_DEFAULT
from datasette.column_types import SQLiteType
from datasette.events import AlterTableEvent, CreateTableEvent, InsertRowsEvent
from datasette.resources import DatabaseResource, TableResource
from datasette.utils import sqlite3
from datasette.utils import (
escape_sqlite,
get_outbound_foreign_keys,
sqlite3,
table_column_details,
)
from datasette.utils.asgi import NotFound, Response
from datasette.utils.sqlite import sqlite_hidden_table_names
from .base import BaseView, _error
@ -41,6 +49,177 @@ ALTER_TABLE_TYPE_FOR_SQLITE_TYPE = {
SQLiteType.REAL: "float",
SQLiteType.BLOB: "blob",
}
FOREIGN_KEY_SUGGESTION_ROW_LIMIT = 500
FOREIGN_KEY_SUGGESTION_TIME_LIMIT_MS = 50
FOREIGN_KEY_SUGGESTION_TOTAL_TIME_LIMIT_MS = 200
class ForeignKeySuggestionTimedOut(Exception):
pass
def _sqlite_type_affinity(type_name):
type_name = (type_name or "").upper()
if "INT" in type_name:
return "integer"
if any(token in type_name for token in ("CHAR", "CLOB", "TEXT")):
return "text"
if "BLOB" in type_name or not type_name:
return "blob"
if any(token in type_name for token in ("REAL", "FLOA", "DOUB")):
return "real"
return "numeric"
def _foreign_key_type_compatible(source_affinity, target_affinity):
if source_affinity == target_affinity:
return True
numeric_affinities = {"integer", "real", "numeric"}
if source_affinity == "numeric":
return target_affinity in numeric_affinities
if target_affinity == "numeric":
return source_affinity in numeric_affinities
return False
def _public_foreign_key_target(target):
return {
"fk_table": target["fk_table"],
"fk_column": target["fk_column"],
"type": target["type"],
}
def _singular(name):
if name.endswith("ies") and len(name) > 3:
return name[:-3] + "y"
if name.endswith("s") and len(name) > 1:
return name[:-1]
return name
def _foreign_key_name_reasons(source_column, target):
source = source_column.lower()
table = target["fk_table"].lower()
singular_table = _singular(table)
column = target["fk_column"].lower()
possible_names = {
"{}_{}".format(table, column),
"{}_{}".format(singular_table, column),
}
if column == "id":
possible_names.update(
{
"{}_id".format(table),
"{}_id".format(singular_table),
}
)
return ["name_match"] if source in possible_names else []
def _foreign_key_option_sort_key(source_column, target):
has_name_match = bool(_foreign_key_name_reasons(source_column, target))
return (
0 if has_name_match else 1,
target["fk_table"],
target["fk_column"],
)
def _foreign_key_suggestion_metadata(conn, table_name):
hidden_tables = set(sqlite_hidden_table_names(conn))
source_columns = [
{
"column": column.name,
"type": (column.type or "").upper(),
"affinity": _sqlite_type_affinity(column.type),
}
for column in table_column_details(conn, table_name)
if not column.hidden
]
current_by_column = {
fk["column"]: {
"fk_table": fk["other_table"],
"fk_column": fk["other_column"],
}
for fk in get_outbound_foreign_keys(conn, table_name)
}
table_names = [
row[0]
for row in conn.execute(
"select name from sqlite_master where type = 'table' order by name"
).fetchall()
if not row[0].startswith("sqlite_")
]
targets = []
for candidate_table in table_names:
if candidate_table == table_name or candidate_table in hidden_tables:
continue
columns = [column for column in table_column_details(conn, candidate_table)]
pks = [column for column in columns if column.is_pk and not column.hidden]
pks.sort(key=lambda column: column.is_pk)
if len(pks) != 1:
continue
pk = pks[0]
targets.append(
{
"fk_table": candidate_table,
"fk_column": pk.name,
"type": (pk.type or "").upper(),
"affinity": _sqlite_type_affinity(pk.type),
}
)
return source_columns, targets, current_by_column
async def _foreign_key_suggestion_samples(db, table_name, columns):
if not columns:
return 0, {}
sql = "select {} from {} limit {}".format(
", ".join(escape_sqlite(column) for column in columns),
escape_sqlite(table_name),
FOREIGN_KEY_SUGGESTION_ROW_LIMIT,
)
try:
results = await db.execute(
sql,
custom_time_limit=FOREIGN_KEY_SUGGESTION_TIME_LIMIT_MS,
log_sql_errors=False,
)
except QueryInterrupted as e:
raise ForeignKeySuggestionTimedOut from e
values_by_column = {column: [] for column in columns}
seen_by_column = {column: set() for column in columns}
for row in results.rows:
for column in columns:
value = row[column]
if value is None or value in seen_by_column[column]:
continue
seen_by_column[column].add(value)
values_by_column[column].append(value)
return len(results.rows), values_by_column
async def _foreign_key_suggestion_values_exist(db, target, values, time_limit_ms):
if not values:
return False
sql = "select {} from {} where {} in ({})".format(
escape_sqlite(target["fk_column"]),
escape_sqlite(target["fk_table"]),
escape_sqlite(target["fk_column"]),
", ".join("?" for _ in values),
)
try:
results = await db.execute(
sql,
params=values,
custom_time_limit=time_limit_ms,
log_sql_errors=False,
)
except QueryInterrupted as e:
raise ForeignKeySuggestionTimedOut from e
found = {row[0] for row in results.rows}
return all(value in found for value in values)
async def _create_table_ui_context(
@ -609,6 +788,128 @@ class TableCreateView(BaseView):
return Response.json(details, status=201)
class TableForeignKeySuggestionsView(BaseView):
name = "table-foreign-key-suggestions"
def __init__(self, datasette):
self.ds = datasette
async def get(self, request):
try:
resolved = await self.ds.resolve_table(request)
except NotFound as e:
return _error([e.args[0]], 404)
db = resolved.db
database_name = db.name
table_name = resolved.table
if resolved.is_view:
return _error(["Cannot suggest foreign keys for a view"], 400)
if not await self.ds.allowed(
action="alter-table",
resource=TableResource(database=database_name, table=table_name),
actor=request.actor,
):
return _error(["Permission denied: need alter-table"], 403)
source_columns, targets, current_by_column = await db.execute_fn(
lambda conn: _foreign_key_suggestion_metadata(conn, table_name)
)
columns = []
options_by_column = {}
for source_column in source_columns:
options = sorted(
[
target
for target in targets
if _foreign_key_type_compatible(
source_column["affinity"], target["affinity"]
)
],
key=lambda target: _foreign_key_option_sort_key(
source_column["column"], target
),
)
options_by_column[source_column["column"]] = options
columns.append(
{
"column": source_column["column"],
"type": source_column["type"],
"affinity": source_column["affinity"],
"current": current_by_column.get(source_column["column"]),
"suggestions": [],
"options": [
_public_foreign_key_target(option) for option in options
],
}
)
columns_to_sample = [
column["column"]
for column in columns
if options_by_column[column["column"]]
]
row_check = {
"attempted": bool(columns_to_sample),
"status": "completed" if columns_to_sample else "skipped",
"row_limit": FOREIGN_KEY_SUGGESTION_ROW_LIMIT,
"sampled_rows": 0,
"checked_options": 0,
}
try:
sampled_rows, values_by_column = await _foreign_key_suggestion_samples(
db, table_name, columns_to_sample
)
row_check["sampled_rows"] = sampled_rows
deadline = time.perf_counter() + (
FOREIGN_KEY_SUGGESTION_TOTAL_TIME_LIMIT_MS / 1000
)
for column_info in columns:
values = values_by_column.get(column_info["column"]) or []
if not values:
continue
for option in options_by_column[column_info["column"]]:
remaining_ms = int((deadline - time.perf_counter()) * 1000)
if remaining_ms <= 0:
raise ForeignKeySuggestionTimedOut
if await _foreign_key_suggestion_values_exist(
db,
option,
values,
min(FOREIGN_KEY_SUGGESTION_TIME_LIMIT_MS, remaining_ms),
):
reasons = [
"type_match",
"sample_values_exist",
] + _foreign_key_name_reasons(column_info["column"], option)
column_info["suggestions"].append(
{
"fk_table": option["fk_table"],
"fk_column": option["fk_column"],
"confidence": "sampled",
"sampled_values": len(values),
"reasons": reasons,
}
)
row_check["checked_options"] += 1
except ForeignKeySuggestionTimedOut:
row_check["status"] = "timed_out"
return Response.json(
{
"ok": True,
"database": database_name,
"table": table_name,
"row_check": row_check,
"columns": columns,
}
)
class TableAlterView(BaseView):
name = "table-alter"

View file

@ -2097,6 +2097,64 @@ To use the ``"replace": true`` option you will also need the :ref:`actions_updat
Pass ``"alter": true`` to automatically add any missing columns to the existing table that are present in the rows you are submitting. This requires the :ref:`actions_alter_table` permission.
.. _TableForeignKeySuggestionsView:
Table foreign key suggestions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The ``/<database>/<table>/-/foreign-key-suggestions`` endpoint suggests possible single-column foreign key relationships for a table. This requires the :ref:`actions_alter_table` permission.
::
GET /<database>/<table>/-/foreign-key-suggestions
The response includes every type-compatible single-column primary key target for each column in ``options``. Datasette also performs a bounded data check against up to 500 rows in the table: if the sampled non-null values for a column all exist in a target primary key, that target is included in ``suggestions``.
If the bounded check takes too long, the endpoint fails open. It still returns the type-compatible ``options`` for each column, but ``row_check.status`` will be ``"timed_out"`` and there may be no ``suggestions``.
.. code-block:: json
{
"ok": true,
"database": "data",
"table": "projects",
"row_check": {
"attempted": true,
"status": "completed",
"row_limit": 500,
"sampled_rows": 3,
"checked_options": 4
},
"columns": [
{
"column": "owner_id",
"type": "INTEGER",
"affinity": "integer",
"current": null,
"suggestions": [
{
"fk_table": "owners",
"fk_column": "id",
"confidence": "sampled",
"sampled_values": 3,
"reasons": [
"type_match",
"sample_values_exist",
"name_match"
]
}
],
"options": [
{
"fk_table": "owners",
"fk_column": "id",
"type": "INTEGER"
}
]
}
]
}
.. _TableAlterView:
Altering tables

View file

@ -1044,6 +1044,106 @@ async def test_alter_table_foreign_key_without_fk_column_requires_single_pk(ds_w
}
@pytest.mark.asyncio
async def test_foreign_key_suggestions(ds_write):
token = write_token(ds_write, permissions=["at"])
db = ds_write.get_database("data")
await db.execute_write("create table owners (id integer primary key)")
await db.execute_write("insert into owners (id) values (1), (2), (3)")
await db.execute_write("create table categories (slug text primary key)")
await db.execute_write("insert into categories (slug) values ('one'), ('two')")
await db.execute_write("create table numbers (id integer primary key)")
await db.execute_write("insert into numbers (id) values (10), (20)")
await db.execute_write("create table weights (id real primary key)")
await db.execute_write("insert into weights (id) values (1.5), (2.5)")
await db.execute_write(
"insert into docs (id, title, score, age) values "
"(1, 'one', 1.5, 1), (2, 'two', 999.5, 2), (3, null, null, null)"
)
response = await ds_write.client.get(
"/data/docs/-/foreign-key-suggestions",
headers=_headers(token),
)
assert response.status_code == 200, response.text
data = response.json()
assert data["ok"] is True
assert data["database"] == "data"
assert data["table"] == "docs"
assert data["row_check"]["attempted"] is True
assert data["row_check"]["status"] == "completed"
assert data["row_check"]["row_limit"] == 500
assert data["row_check"]["sampled_rows"] == 3
columns = {column["column"]: column for column in data["columns"]}
assert columns["age"]["options"] == [
{"fk_table": "numbers", "fk_column": "id", "type": "INTEGER"},
{"fk_table": "owners", "fk_column": "id", "type": "INTEGER"},
]
assert columns["age"]["suggestions"] == [
{
"fk_table": "owners",
"fk_column": "id",
"confidence": "sampled",
"sampled_values": 2,
"reasons": ["type_match", "sample_values_exist"],
}
]
assert columns["title"]["options"] == [
{"fk_table": "categories", "fk_column": "slug", "type": "TEXT"}
]
assert columns["title"]["suggestions"][0]["fk_table"] == "categories"
assert columns["score"]["options"] == [
{"fk_table": "weights", "fk_column": "id", "type": "REAL"}
]
assert columns["score"]["suggestions"] == []
@pytest.mark.asyncio
async def test_foreign_key_suggestions_permission_denied(ds_write):
token = write_token(ds_write, permissions=["ir"])
response = await ds_write.client.get(
"/data/docs/-/foreign-key-suggestions",
headers=_headers(token),
)
assert response.status_code == 403
assert response.json() == {
"ok": False,
"errors": ["Permission denied: need alter-table"],
}
@pytest.mark.asyncio
async def test_foreign_key_suggestions_fail_open(ds_write, monkeypatch):
token = write_token(ds_write, permissions=["at"])
db = ds_write.get_database("data")
await db.execute_write("create table owners (id integer primary key)")
async def raise_timeout(*args, **kwargs):
raise table_create_alter.ForeignKeySuggestionTimedOut
from datasette.views import table_create_alter
monkeypatch.setattr(
table_create_alter,
"_foreign_key_suggestion_samples",
raise_timeout,
)
response = await ds_write.client.get(
"/data/docs/-/foreign-key-suggestions",
headers=_headers(token),
)
assert response.status_code == 200, response.text
data = response.json()
assert data["row_check"]["status"] == "timed_out"
columns = {column["column"]: column for column in data["columns"]}
assert columns["age"]["options"] == [
{"fk_table": "owners", "fk_column": "id", "type": "INTEGER"}
]
assert columns["age"]["suggestions"] == []
@pytest.mark.asyncio
async def test_alter_table_permission_denied(ds_write):
token = write_token(ds_write, permissions=["ir"])