Optionally limit ColumnType subclasses to specific SQLite types (#2673)

* ColumnTypes now have optional SQLite column types

Refs #2672
This commit is contained in:
Simon Willison 2026-03-18 11:37:09 -07:00 committed by GitHub
commit feaba9b18b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 198 additions and 10 deletions

View file

@ -43,6 +43,7 @@ from jinja2.environment import Template
from jinja2.exceptions import TemplateNotFound
from .events import Event
from .column_types import SQLiteType
from .views import Context
from .views.database import database_download, DatabaseView, TableCreateView, QueryView
from .views.index import IndexView
@ -959,6 +960,63 @@ class Datasette:
# Column types API
async def _get_resource_column_details(self, database: str, resource: str):
db = self.databases.get(database)
if db is None:
return {}
try:
return {
column.name: column
for column in await db.table_column_details(resource)
}
except sqlite3.OperationalError:
return {}
@staticmethod
def _column_type_is_applicable(ct_cls, column_detail) -> bool:
sqlite_types = getattr(ct_cls, "sqlite_types", None)
if sqlite_types is None:
return True
if column_detail is None:
return False
actual_sqlite_type = SQLiteType.from_declared_type(column_detail.type)
return actual_sqlite_type in sqlite_types
async def _validate_column_type_assignment(
self, database: str, resource: str, column: str, ct_cls
) -> None:
sqlite_types = getattr(ct_cls, "sqlite_types", None)
if sqlite_types is None:
return
column_detail = (
await self._get_resource_column_details(database, resource)
).get(column)
if column_detail is None:
return
actual_sqlite_type = SQLiteType.from_declared_type(column_detail.type)
if actual_sqlite_type in sqlite_types:
return
allowed = ", ".join(sqlite_type.value for sqlite_type in sqlite_types)
actual = (
actual_sqlite_type.value
if actual_sqlite_type is not None
else "unrecognized {!r}".format(column_detail.type)
)
raise ValueError(
"Column type {!r} is only applicable to SQLite types {} but {}.{}.{} "
"has SQLite type {}".format(
ct_cls.name,
allowed,
database,
resource,
column,
actual,
)
)
async def _apply_column_types_config(self):
"""Load column_types from datasette.json config into the internal DB."""
import logging
@ -980,9 +1038,12 @@ class Datasette:
table_name,
col_name,
)
await self.set_column_type(
db_name, table_name, col_name, col_type, config
)
try:
await self.set_column_type(
db_name, table_name, col_name, col_type, config
)
except ValueError as ex:
logging.warning(str(ex))
async def get_column_type(self, database: str, resource: str, column: str):
"""
@ -1001,6 +1062,11 @@ class Datasette:
ct_cls = self._column_types.get(ct_name)
if ct_cls is None:
return None
column_detail = (
await self._get_resource_column_details(database, resource)
).get(column)
if not self._column_type_is_applicable(ct_cls, column_detail):
return None
return ct_cls(config=json.loads(config) if config else None)
async def get_column_types(self, database: str, resource: str) -> dict:
@ -1013,11 +1079,14 @@ class Datasette:
"WHERE database_name = ? AND resource_name = ?",
[database, resource],
)
column_details = await self._get_resource_column_details(database, resource)
result = {}
for row in rows.rows:
col_name, ct_name, config = row
ct_cls = self._column_types.get(ct_name)
if ct_cls is not None:
if ct_cls is not None and self._column_type_is_applicable(
ct_cls, column_details.get(col_name)
):
result[col_name] = ct_cls(config=json.loads(config) if config else None)
return result
@ -1030,6 +1099,11 @@ class Datasette:
config: dict = None,
) -> None:
"""Assign a column type. Overwrites any existing assignment."""
ct_cls = self._column_types.get(column_type)
if ct_cls is not None:
await self._validate_column_type_assignment(
database, resource, column, ct_cls
)
await self.get_internal_database().execute_write(
"""INSERT OR REPLACE INTO column_types
(database_name, resource_name, column_name, column_type, config)

View file

@ -1,3 +1,39 @@
from enum import Enum
class SQLiteType(Enum):
TEXT = "TEXT"
INTEGER = "INTEGER"
REAL = "REAL"
BLOB = "BLOB"
NULL = "NULL"
@classmethod
def from_declared_type(cls, declared_type: str | None) -> "SQLiteType | None":
if declared_type is None:
return cls.NULL
normalized = declared_type.strip().upper()
if not normalized:
return cls.NULL
if normalized == cls.NULL.value:
return cls.NULL
if "INT" in normalized:
return cls.INTEGER
if any(token in normalized for token in ("CHAR", "CLOB", "TEXT")):
return cls.TEXT
if "BLOB" in normalized:
return cls.BLOB
if any(
token in normalized
for token in ("REAL", "FLOA", "DOUB") # codespell:ignore doub
):
return cls.REAL
return None
class ColumnType:
"""
Base class for column types.
@ -8,6 +44,8 @@ class ColumnType:
Examples: "markdown", "file", "email", "url", "point", "image".
- ``description``: Human-readable label for admin UI dropdowns.
Examples: "Markdown text", "File reference", "Email address".
- ``sqlite_types``: Optional tuple of SQLiteType values restricting
which SQLite column types this ColumnType can be assigned to.
Instantiate with an optional ``config`` dict to bind per-column
configuration::
@ -18,6 +56,7 @@ class ColumnType:
name: str
description: str
sqlite_types: tuple[SQLiteType, ...] | None = None
def __init__(self, config=None):
self.config = config

View file

@ -4,12 +4,13 @@ import re
import markupsafe
from datasette import hookimpl
from datasette.column_types import ColumnType
from datasette.column_types import ColumnType, SQLiteType
class UrlColumnType(ColumnType):
name = "url"
description = "URL"
sqlite_types = (SQLiteType.TEXT,)
async def render_cell(self, value, column, table, database, datasette, request):
if not value or not isinstance(value, str):
@ -30,6 +31,7 @@ class UrlColumnType(ColumnType):
class EmailColumnType(ColumnType):
name = "email"
description = "Email address"
sqlite_types = (SQLiteType.TEXT,)
async def render_cell(self, value, column, table, database, datasette, request):
if not value or not isinstance(value, str):
@ -50,6 +52,7 @@ class EmailColumnType(ColumnType):
class JsonColumnType(ColumnType):
name = "json"
description = "JSON data"
sqlite_types = (SQLiteType.TEXT,)
async def render_cell(self, value, column, table, database, datasette, request):
if value is None:

View file

@ -86,7 +86,7 @@ def register_actions(datasette):
@hookspec
def register_column_types(datasette):
"""Return a list of ColumnType instances"""
"""Return a list of ColumnType subclasses"""
@hookspec

View file

@ -1103,6 +1103,8 @@ These configure :ref:`full-text search <full_text_search>` for a table or view.
You can assign semantic column types to columns, which affect how values are rendered, validated, and transformed. Built-in column types include ``url``, ``email``, and ``json``. Plugins can register additional column types using the :ref:`register_column_types <plugin_register_column_types>` plugin hook.
Column types can optionally declare which SQLite column types they apply to using ``sqlite_types``. Datasette will reject incompatible assignments. The built-in ``url``, ``email``, and ``json`` column types are all restricted to ``TEXT`` columns.
The simplest form maps column names to type name strings:
.. [[[cog
@ -1210,4 +1212,3 @@ For column types that accept additional configuration, use an object with ``type
}
.. [[[end]]]

View file

@ -968,6 +968,7 @@ await .set_column_type(database, resource, column, column_type, config=None)
Optional configuration dict for the column type.
Assigns a column type to a column. Overwrites any existing assignment for that column.
Raises ``ValueError`` if the column type declares ``sqlite_types`` and the target column does not match one of those SQLite types.
.. code-block:: python

View file

@ -1004,13 +1004,14 @@ Return a list of :ref:`ColumnType <column_types>` **subclasses** (not instances)
.. code-block:: python
from datasette import hookimpl
from datasette.column_types import ColumnType
from datasette.column_types import ColumnType, SQLiteType
import markupsafe
class ColorColumnType(ColumnType):
name = "color"
description = "CSS color value"
sqlite_types = (SQLiteType.TEXT,)
async def render_cell(
self,
@ -1052,6 +1053,9 @@ Each ``ColumnType`` subclass must define the following class attributes:
``description`` - string
Human-readable label, e.g. ``"CSS color value"``.
``sqlite_types`` - tuple of ``SQLiteType`` values, optional
Restrict assignments of this column type to columns with matching SQLite types, e.g. ``(SQLiteType.TEXT,)``. If omitted, the column type can be assigned to any column.
And the following methods, all optional:
``render_cell(self, value, column, table, database, datasette, request)``
@ -2485,4 +2489,3 @@ Tokens can then be created and verified using :ref:`datasette.create_token() <da
actor = await datasette.verify_token(token)
If no handlers are registered, ``create_token()`` raises ``RuntimeError``. If the requested ``handler`` name is not found, it raises ``ValueError``.

View file

@ -1,7 +1,10 @@
import logging
from datasette.app import Datasette
from datasette.column_types import ColumnType
from datasette.column_types import (
ColumnType,
SQLiteType,
)
from datasette.hookspecs import hookimpl
from datasette.plugins import pm
from datasette.utils import sqlite3
@ -183,6 +186,32 @@ async def test_set_column_type_with_config(ds_ct):
assert ct.config == {"max_length": 200}
@pytest.mark.asyncio
async def test_set_column_type_rejects_incompatible_sqlite_type(ds_ct):
await ds_ct.invoke_startup()
with pytest.raises(ValueError, match="only applicable to SQLite types TEXT"):
await ds_ct.set_column_type("data", "posts", "id", "json")
@pytest.mark.asyncio
async def test_set_column_type_allows_varchar_for_text_only_type(tmp_path_factory):
db_directory = tmp_path_factory.mktemp("dbs")
db_path = str(db_directory / "data.db")
db = sqlite3.connect(str(db_path))
db.execute("vacuum")
db.execute("create table links (id integer primary key, url varchar(255))")
db.commit()
ds = Datasette([db_path])
await ds.invoke_startup()
await ds.set_column_type("data", "links", "url", "url")
ct = await ds.get_column_type("data", "links", "url")
assert ct.name == "url"
db.close()
for database in ds.databases.values():
if not database.is_memory:
database.close()
# --- Plugin registration ---
@ -202,9 +231,23 @@ async def test_column_type_class_attributes(ds_ct):
url_cls = ds_ct._column_types["url"]
assert url_cls.name == "url"
assert url_cls.description == "URL"
assert url_cls.sqlite_types == (SQLiteType.TEXT,)
email_cls = ds_ct._column_types["email"]
assert email_cls.name == "email"
assert email_cls.description == "Email address"
assert email_cls.sqlite_types == (SQLiteType.TEXT,)
json_cls = ds_ct._column_types["json"]
assert json_cls.sqlite_types == (SQLiteType.TEXT,)
def test_sqlite_type_from_declared_type():
assert SQLiteType.from_declared_type("text") == SQLiteType.TEXT
assert SQLiteType.from_declared_type("varchar(255)") == SQLiteType.TEXT
assert SQLiteType.from_declared_type("integer") == SQLiteType.INTEGER
assert SQLiteType.from_declared_type("float") == SQLiteType.REAL
assert SQLiteType.from_declared_type("blob") == SQLiteType.BLOB
assert SQLiteType.from_declared_type("") == SQLiteType.NULL
assert SQLiteType.from_declared_type("numeric") is None
# --- JSON API ---
@ -658,6 +701,30 @@ async def test_unknown_type_warning_logged(tmp_path_factory, caplog):
database.close()
@pytest.mark.asyncio
async def test_incompatible_sqlite_type_warning_logged(tmp_path_factory, caplog):
db_directory = tmp_path_factory.mktemp("dbs")
db_path = str(db_directory / "data.db")
db = sqlite3.connect(str(db_path))
db.execute("vacuum")
db.execute("create table t (id integer primary key, col integer)")
db.commit()
ds = Datasette(
[db_path],
config={
"databases": {"data": {"tables": {"t": {"column_types": {"col": "json"}}}}}
},
)
with caplog.at_level(logging.WARNING):
await ds.invoke_startup()
assert "only applicable to sqlite types text" in caplog.text.lower()
assert await ds.get_column_type("data", "t", "col") is None
db.close()
for database in ds.databases.values():
if not database.is_memory:
database.close()
# --- Config overwrites on restart ---