diff --git a/datasette/views/table.py b/datasette/views/table.py
index 3c9b6656..20d78164 100644
--- a/datasette/views/table.py
+++ b/datasette/views/table.py
@@ -1851,7 +1851,20 @@ async def table_view_data(
}
)
raw_sqlite_rows = rows[:page_size]
- data["rows"] = [dict(r) for r in raw_sqlite_rows]
+ # Apply transform_value for columns with assigned types
+ ct_map = await datasette.get_column_types(database_name, table_name)
+ transformed_rows = []
+ for r in raw_sqlite_rows:
+ row_dict = dict(r)
+ for col_name, (ct_name, ct_config) in ct_map.items():
+ if col_name in row_dict:
+ ct_class = datasette.get_column_type_class(ct_name)
+ if ct_class:
+ row_dict[col_name] = await ct_class.transform_value(
+ row_dict[col_name], ct_config, datasette
+ )
+ transformed_rows.append(row_dict)
+ data["rows"] = transformed_rows
if context_for_html_hack:
data.update(extra_context_from_filters)
diff --git a/tests/test_column_types.py b/tests/test_column_types.py
index 3cbadf5e..efb8fbc7 100644
--- a/tests/test_column_types.py
+++ b/tests/test_column_types.py
@@ -1,7 +1,15 @@
+import logging
+
+import logging
+
from datasette.app import Datasette
from datasette.column_types import ColumnType
+from datasette.hookspecs import hookimpl
+from datasette.plugins import pm
from datasette.utils import sqlite3
+from datasette.utils import StartupError
import json
+import markupsafe
import pytest
import time
@@ -367,3 +375,343 @@ async def test_render_cell_extra_with_column_types(ds_ct):
rendered = data["render_cell"][0]
assert "mailto:" in rendered["author_email"]
assert "href" in rendered["website"]
+
+
+# --- Duplicate column type name ---
+
+
+@pytest.mark.asyncio
+async def test_duplicate_column_type_name_raises_error():
+ class DuplicateUrlType(ColumnType):
+ async def render_cell(self, value, column, table, database, datasette, request, config):
+ return None
+
+ class _Plugin:
+ @hookimpl
+ def register_column_types(self, datasette):
+ return [DuplicateUrlType(name="url", description="Duplicate URL")]
+
+ plugin = _Plugin()
+ pm.register(plugin, name="test_duplicate_ct")
+ try:
+ ds = Datasette()
+ with pytest.raises(StartupError, match="Duplicate column type name: url"):
+ await ds.invoke_startup()
+ finally:
+ pm.unregister(plugin, name="test_duplicate_ct")
+
+
+# --- Row endpoint ---
+
+
+@pytest.mark.asyncio
+async def test_row_endpoint_render_cell_with_column_types(ds_ct):
+ await ds_ct.invoke_startup()
+ response = await ds_ct.client.get("/data/posts/1.json?_extra=render_cell")
+ assert response.status_code == 200
+ data = response.json()
+ rendered = data["render_cell"][0]
+ assert "mailto:" in rendered["author_email"]
+ assert "href" in rendered["website"]
+
+
+# --- transform_value in JSON output ---
+
+
+@pytest.mark.asyncio
+async def test_transform_value_in_json_output(tmp_path_factory):
+ """A column type with transform_value should modify rows in JSON API."""
+
+ class UpperColumnType(ColumnType):
+ async def transform_value(self, value, config, datasette):
+ if isinstance(value, str):
+ return value.upper()
+ return value
+
+ class _Plugin:
+ @hookimpl
+ def register_column_types(self, datasette):
+ return [UpperColumnType(name="upper", description="Uppercase")]
+
+ plugin = _Plugin()
+ pm.register(plugin, name="test_transform_ct")
+ try:
+ db_directory = tmp_path_factory.mktemp("dbs")
+ db_path = str(db_directory / "data.db")
+ db = sqlite3.connect(str(db_path))
+ db.execute("vacuum")
+ db.execute("create table t (id integer primary key, name text)")
+ db.execute("insert into t values (1, 'hello')")
+ db.commit()
+ ds = Datasette(
+ [db_path],
+ config={
+ "databases": {
+ "data": {
+ "tables": {
+ "t": {
+ "column_types": {"name": "upper"}
+ }
+ }
+ }
+ }
+ },
+ )
+ await ds.invoke_startup()
+ response = await ds.client.get("/data/t.json")
+ assert response.status_code == 200
+ data = response.json()
+ assert data["rows"][0]["name"] == "HELLO"
+ db.close()
+ for database in ds.databases.values():
+ if not database.is_memory:
+ database.close()
+ finally:
+ pm.unregister(plugin, name="test_transform_ct")
+
+
+# --- Column type priority over plugins ---
+
+
+@pytest.mark.asyncio
+async def test_column_type_render_cell_has_priority_over_plugins(tmp_path_factory):
+ """Column type render_cell should take priority over render_cell plugin hook."""
+
+ class PriorityColumnType(ColumnType):
+ async def render_cell(self, value, column, table, database, datasette, request, config):
+ if value is not None:
+ return markupsafe.Markup(f"COLUMN_TYPE:{markupsafe.escape(value)}")
+ return None
+
+ class _ColumnTypePlugin:
+ @hookimpl
+ def register_column_types(self, datasette):
+ return [PriorityColumnType(name="priority_test", description="Priority test")]
+
+ class _RenderCellPlugin:
+ @hookimpl
+ def render_cell(self, row, value, column, table, pks, database, datasette, request,
+ column_type, column_type_config):
+ if column == "name":
+ return markupsafe.Markup(f"PLUGIN:{markupsafe.escape(value)}")
+
+ ct_plugin = _ColumnTypePlugin()
+ rc_plugin = _RenderCellPlugin()
+ pm.register(ct_plugin, name="test_priority_ct")
+ pm.register(rc_plugin, name="test_priority_render")
+ try:
+ db_directory = tmp_path_factory.mktemp("dbs")
+ db_path = str(db_directory / "data.db")
+ db = sqlite3.connect(str(db_path))
+ db.execute("vacuum")
+ db.execute("create table t (id integer primary key, name text)")
+ db.execute("insert into t values (1, 'hello')")
+ db.commit()
+ ds = Datasette(
+ [db_path],
+ config={
+ "databases": {
+ "data": {
+ "tables": {
+ "t": {
+ "column_types": {"name": "priority_test"}
+ }
+ }
+ }
+ }
+ },
+ )
+ await ds.invoke_startup()
+ response = await ds.client.get("/data/t.json?_extra=render_cell")
+ assert response.status_code == 200
+ data = response.json()
+ rendered = data["render_cell"][0]
+ # Column type should win over the plugin
+ assert "COLUMN_TYPE:" in rendered["name"]
+ assert "PLUGIN:" not in rendered["name"]
+ db.close()
+ for database in ds.databases.values():
+ if not database.is_memory:
+ database.close()
+ finally:
+ pm.unregister(ct_plugin, name="test_priority_ct")
+ pm.unregister(rc_plugin, name="test_priority_render")
+
+
+# --- Row detail page rendering ---
+
+
+@pytest.mark.asyncio
+async def test_row_detail_page_html_rendering(ds_ct):
+ """Row detail HTML page should use column type rendering."""
+ await ds_ct.invoke_startup()
+ response = await ds_ct.client.get("/data/posts/1")
+ assert response.status_code == 200
+ html = response.text
+ # The email column should be rendered with mailto: link
+ assert "mailto:test@example.com" in html
+ # The url column should be rendered with href
+ assert 'href="https://example.com"' in html
+
+
+# --- HTML table page rendering ---
+
+
+@pytest.mark.asyncio
+async def test_html_table_page_rendering(ds_ct):
+ """HTML table page should use column type rendering."""
+ await ds_ct.invoke_startup()
+ response = await ds_ct.client.get("/data/posts")
+ assert response.status_code == 200
+ html = response.text
+ assert "mailto:test@example.com" in html
+ assert 'href="https://example.com"' in html
+
+
+# --- Validation on upsert ---
+
+
+@pytest.mark.asyncio
+async def test_validation_on_upsert(ds_ct):
+ await ds_ct.invoke_startup()
+ token = write_token(ds_ct)
+ response = await ds_ct.client.post(
+ "/data/posts/-/upsert",
+ json={
+ "rows": [{"id": 1, "title": "Updated", "author_email": "invalid"}],
+ },
+ headers=_headers(token),
+ )
+ assert response.status_code == 400
+ assert "author_email" in response.json()["errors"][0]
+
+
+@pytest.mark.asyncio
+async def test_validation_on_upsert_passes_valid(ds_ct):
+ await ds_ct.invoke_startup()
+ token = write_token(ds_ct)
+ response = await ds_ct.client.post(
+ "/data/posts/-/upsert",
+ json={
+ "rows": [{"id": 1, "title": "Updated", "author_email": "valid@test.com"}],
+ },
+ headers=_headers(token),
+ )
+ assert response.status_code == 200
+
+
+# --- Unknown type warning logged ---
+
+
+@pytest.mark.asyncio
+async def test_unknown_type_warning_logged(tmp_path_factory, caplog):
+ db_directory = tmp_path_factory.mktemp("dbs")
+ db_path = str(db_directory / "data.db")
+ db = sqlite3.connect(str(db_path))
+ db.execute("vacuum")
+ db.execute("create table t (id integer primary key, col text)")
+ db.commit()
+ ds = Datasette(
+ [db_path],
+ config={
+ "databases": {
+ "data": {
+ "tables": {
+ "t": {
+ "column_types": {"col": "nonexistent_type"}
+ }
+ }
+ }
+ }
+ },
+ )
+ with caplog.at_level(logging.WARNING):
+ await ds.invoke_startup()
+ assert "unknown type" in caplog.text.lower()
+ assert "nonexistent_type" in caplog.text
+ db.close()
+ for database in ds.databases.values():
+ if not database.is_memory:
+ database.close()
+
+
+# --- Config overwrites on restart ---
+
+
+@pytest.mark.asyncio
+async def test_config_overwrites_on_restart(tmp_path_factory):
+ """Config values should overwrite any existing column types in internal DB on startup."""
+ db_directory = tmp_path_factory.mktemp("dbs")
+ db_path = str(db_directory / "data.db")
+ db = sqlite3.connect(str(db_path))
+ db.execute("vacuum")
+ db.execute("create table t (id integer primary key, col text)")
+ db.commit()
+ ds = Datasette(
+ [db_path],
+ config={
+ "databases": {
+ "data": {
+ "tables": {
+ "t": {
+ "column_types": {"col": "email"}
+ }
+ }
+ }
+ }
+ },
+ )
+ await ds.invoke_startup()
+ ct, _ = await ds.get_column_type("data", "t", "col")
+ assert ct == "email"
+
+ # Manually change the column type in the internal DB
+ await ds.set_column_type("data", "t", "col", "url")
+ ct, _ = await ds.get_column_type("data", "t", "col")
+ assert ct == "url"
+
+ # Re-apply config (simulating what happens on restart)
+ await ds._apply_column_types_config()
+ ct, _ = await ds.get_column_type("data", "t", "col")
+ assert ct == "email" # Config wins
+
+ db.close()
+ for database in ds.databases.values():
+ if not database.is_memory:
+ database.close()
+
+
+# --- No column_types in config ---
+
+
+@pytest.mark.asyncio
+async def test_no_column_types_in_config(tmp_path_factory):
+ """Datasette should work fine without any column_types configuration."""
+ db_directory = tmp_path_factory.mktemp("dbs")
+ db_path = str(db_directory / "data.db")
+ db = sqlite3.connect(str(db_path))
+ db.execute("vacuum")
+ db.execute("create table t (id integer primary key, col text)")
+ db.execute("insert into t values (1, 'hello')")
+ db.commit()
+ ds = Datasette([db_path])
+ await ds.invoke_startup()
+
+ # No column types assigned
+ ct_map = await ds.get_column_types("data", "t")
+ assert ct_map == {}
+
+ # JSON endpoint should work without column_types extra
+ response = await ds.client.get("/data/t.json")
+ assert response.status_code == 200
+ assert response.json()["rows"][0]["col"] == "hello"
+
+ # column_types extra should return empty
+ response = await ds.client.get("/data/t.json?_extra=column_types")
+ assert response.status_code == 200
+ assert response.json()["column_types"] == {}
+
+ db.close()
+ for database in ds.databases.values():
+ if not database.is_memory:
+ database.close()