From feaba9b18b11a39bb4a929ff316f093eb138f2f3 Mon Sep 17 00:00:00 2001 From: Simon Willison Date: Wed, 18 Mar 2026 11:37:09 -0700 Subject: [PATCH] Optionally limit ColumnType subclasses to specific SQLite types (#2673) * ColumnTypes now have optional SQLite column types Refs #2672 --- datasette/app.py | 82 +++++++++++++++++++++++++++++-- datasette/column_types.py | 39 +++++++++++++++ datasette/default_column_types.py | 5 +- datasette/hookspecs.py | 2 +- docs/configuration.rst | 3 +- docs/internals.rst | 1 + docs/plugin_hooks.rst | 7 ++- tests/test_column_types.py | 69 +++++++++++++++++++++++++- 8 files changed, 198 insertions(+), 10 deletions(-) diff --git a/datasette/app.py b/datasette/app.py index 3790b340..6e3e6815 100644 --- a/datasette/app.py +++ b/datasette/app.py @@ -43,6 +43,7 @@ from jinja2.environment import Template from jinja2.exceptions import TemplateNotFound from .events import Event +from .column_types import SQLiteType from .views import Context from .views.database import database_download, DatabaseView, TableCreateView, QueryView from .views.index import IndexView @@ -959,6 +960,63 @@ class Datasette: # Column types API + async def _get_resource_column_details(self, database: str, resource: str): + db = self.databases.get(database) + if db is None: + return {} + try: + return { + column.name: column + for column in await db.table_column_details(resource) + } + except sqlite3.OperationalError: + return {} + + @staticmethod + def _column_type_is_applicable(ct_cls, column_detail) -> bool: + sqlite_types = getattr(ct_cls, "sqlite_types", None) + if sqlite_types is None: + return True + if column_detail is None: + return False + actual_sqlite_type = SQLiteType.from_declared_type(column_detail.type) + return actual_sqlite_type in sqlite_types + + async def _validate_column_type_assignment( + self, database: str, resource: str, column: str, ct_cls + ) -> None: + sqlite_types = getattr(ct_cls, "sqlite_types", None) + if sqlite_types is None: + return + + column_detail = ( + await self._get_resource_column_details(database, resource) + ).get(column) + if column_detail is None: + return + + actual_sqlite_type = SQLiteType.from_declared_type(column_detail.type) + if actual_sqlite_type in sqlite_types: + return + + allowed = ", ".join(sqlite_type.value for sqlite_type in sqlite_types) + actual = ( + actual_sqlite_type.value + if actual_sqlite_type is not None + else "unrecognized {!r}".format(column_detail.type) + ) + raise ValueError( + "Column type {!r} is only applicable to SQLite types {} but {}.{}.{} " + "has SQLite type {}".format( + ct_cls.name, + allowed, + database, + resource, + column, + actual, + ) + ) + async def _apply_column_types_config(self): """Load column_types from datasette.json config into the internal DB.""" import logging @@ -980,9 +1038,12 @@ class Datasette: table_name, col_name, ) - await self.set_column_type( - db_name, table_name, col_name, col_type, config - ) + try: + await self.set_column_type( + db_name, table_name, col_name, col_type, config + ) + except ValueError as ex: + logging.warning(str(ex)) async def get_column_type(self, database: str, resource: str, column: str): """ @@ -1001,6 +1062,11 @@ class Datasette: ct_cls = self._column_types.get(ct_name) if ct_cls is None: return None + column_detail = ( + await self._get_resource_column_details(database, resource) + ).get(column) + if not self._column_type_is_applicable(ct_cls, column_detail): + return None return ct_cls(config=json.loads(config) if config else None) async def get_column_types(self, database: str, resource: str) -> dict: @@ -1013,11 +1079,14 @@ class Datasette: "WHERE database_name = ? AND resource_name = ?", [database, resource], ) + column_details = await self._get_resource_column_details(database, resource) result = {} for row in rows.rows: col_name, ct_name, config = row ct_cls = self._column_types.get(ct_name) - if ct_cls is not None: + if ct_cls is not None and self._column_type_is_applicable( + ct_cls, column_details.get(col_name) + ): result[col_name] = ct_cls(config=json.loads(config) if config else None) return result @@ -1030,6 +1099,11 @@ class Datasette: config: dict = None, ) -> None: """Assign a column type. Overwrites any existing assignment.""" + ct_cls = self._column_types.get(column_type) + if ct_cls is not None: + await self._validate_column_type_assignment( + database, resource, column, ct_cls + ) await self.get_internal_database().execute_write( """INSERT OR REPLACE INTO column_types (database_name, resource_name, column_name, column_type, config) diff --git a/datasette/column_types.py b/datasette/column_types.py index c4114294..7320e1d6 100644 --- a/datasette/column_types.py +++ b/datasette/column_types.py @@ -1,3 +1,39 @@ +from enum import Enum + + +class SQLiteType(Enum): + TEXT = "TEXT" + INTEGER = "INTEGER" + REAL = "REAL" + BLOB = "BLOB" + NULL = "NULL" + + @classmethod + def from_declared_type(cls, declared_type: str | None) -> "SQLiteType | None": + if declared_type is None: + return cls.NULL + + normalized = declared_type.strip().upper() + if not normalized: + return cls.NULL + + if normalized == cls.NULL.value: + return cls.NULL + if "INT" in normalized: + return cls.INTEGER + if any(token in normalized for token in ("CHAR", "CLOB", "TEXT")): + return cls.TEXT + if "BLOB" in normalized: + return cls.BLOB + if any( + token in normalized + for token in ("REAL", "FLOA", "DOUB") # codespell:ignore doub + ): + return cls.REAL + + return None + + class ColumnType: """ Base class for column types. @@ -8,6 +44,8 @@ class ColumnType: Examples: "markdown", "file", "email", "url", "point", "image". - ``description``: Human-readable label for admin UI dropdowns. Examples: "Markdown text", "File reference", "Email address". + - ``sqlite_types``: Optional tuple of SQLiteType values restricting + which SQLite column types this ColumnType can be assigned to. Instantiate with an optional ``config`` dict to bind per-column configuration:: @@ -18,6 +56,7 @@ class ColumnType: name: str description: str + sqlite_types: tuple[SQLiteType, ...] | None = None def __init__(self, config=None): self.config = config diff --git a/datasette/default_column_types.py b/datasette/default_column_types.py index b4ebfcc5..24493994 100644 --- a/datasette/default_column_types.py +++ b/datasette/default_column_types.py @@ -4,12 +4,13 @@ import re import markupsafe from datasette import hookimpl -from datasette.column_types import ColumnType +from datasette.column_types import ColumnType, SQLiteType class UrlColumnType(ColumnType): name = "url" description = "URL" + sqlite_types = (SQLiteType.TEXT,) async def render_cell(self, value, column, table, database, datasette, request): if not value or not isinstance(value, str): @@ -30,6 +31,7 @@ class UrlColumnType(ColumnType): class EmailColumnType(ColumnType): name = "email" description = "Email address" + sqlite_types = (SQLiteType.TEXT,) async def render_cell(self, value, column, table, database, datasette, request): if not value or not isinstance(value, str): @@ -50,6 +52,7 @@ class EmailColumnType(ColumnType): class JsonColumnType(ColumnType): name = "json" description = "JSON data" + sqlite_types = (SQLiteType.TEXT,) async def render_cell(self, value, column, table, database, datasette, request): if value is None: diff --git a/datasette/hookspecs.py b/datasette/hookspecs.py index f7bb6ab6..2ab9d0c5 100644 --- a/datasette/hookspecs.py +++ b/datasette/hookspecs.py @@ -86,7 +86,7 @@ def register_actions(datasette): @hookspec def register_column_types(datasette): - """Return a list of ColumnType instances""" + """Return a list of ColumnType subclasses""" @hookspec diff --git a/docs/configuration.rst b/docs/configuration.rst index b61c3692..8c8c8a67 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -1103,6 +1103,8 @@ These configure :ref:`full-text search ` for a table or view. You can assign semantic column types to columns, which affect how values are rendered, validated, and transformed. Built-in column types include ``url``, ``email``, and ``json``. Plugins can register additional column types using the :ref:`register_column_types ` plugin hook. +Column types can optionally declare which SQLite column types they apply to using ``sqlite_types``. Datasette will reject incompatible assignments. The built-in ``url``, ``email``, and ``json`` column types are all restricted to ``TEXT`` columns. + The simplest form maps column names to type name strings: .. [[[cog @@ -1210,4 +1212,3 @@ For column types that accept additional configuration, use an object with ``type } .. [[[end]]] - diff --git a/docs/internals.rst b/docs/internals.rst index 544dd7fd..2442e687 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -968,6 +968,7 @@ await .set_column_type(database, resource, column, column_type, config=None) Optional configuration dict for the column type. Assigns a column type to a column. Overwrites any existing assignment for that column. +Raises ``ValueError`` if the column type declares ``sqlite_types`` and the target column does not match one of those SQLite types. .. code-block:: python diff --git a/docs/plugin_hooks.rst b/docs/plugin_hooks.rst index 69710bb6..53a47334 100644 --- a/docs/plugin_hooks.rst +++ b/docs/plugin_hooks.rst @@ -1004,13 +1004,14 @@ Return a list of :ref:`ColumnType ` **subclasses** (not instances) .. code-block:: python from datasette import hookimpl - from datasette.column_types import ColumnType + from datasette.column_types import ColumnType, SQLiteType import markupsafe class ColorColumnType(ColumnType): name = "color" description = "CSS color value" + sqlite_types = (SQLiteType.TEXT,) async def render_cell( self, @@ -1052,6 +1053,9 @@ Each ``ColumnType`` subclass must define the following class attributes: ``description`` - string Human-readable label, e.g. ``"CSS color value"``. +``sqlite_types`` - tuple of ``SQLiteType`` values, optional + Restrict assignments of this column type to columns with matching SQLite types, e.g. ``(SQLiteType.TEXT,)``. If omitted, the column type can be assigned to any column. + And the following methods, all optional: ``render_cell(self, value, column, table, database, datasette, request)`` @@ -2485,4 +2489,3 @@ Tokens can then be created and verified using :ref:`datasette.create_token()