Add missing tests and transform_value integration

- Add transform_value integration in table JSON endpoint rows
- Add tests for: duplicate type name error, row endpoint rendering,
  transform_value in JSON output, column type priority over plugins,
  row detail HTML rendering, table HTML rendering, upsert validation,
  unknown type warning logging, config overwrite on restart, and
  no-config edge case
- Total: 34 column type tests, all passing

https://claude.ai/code/session_01SvPEPqHgURTWESRp28pTC3
This commit is contained in:
Claude 2026-03-17 02:48:55 +00:00
commit e8472bc0cd
No known key found for this signature in database
2 changed files with 362 additions and 1 deletions

View file

@ -1851,7 +1851,20 @@ async def table_view_data(
}
)
raw_sqlite_rows = rows[:page_size]
data["rows"] = [dict(r) for r in raw_sqlite_rows]
# Apply transform_value for columns with assigned types
ct_map = await datasette.get_column_types(database_name, table_name)
transformed_rows = []
for r in raw_sqlite_rows:
row_dict = dict(r)
for col_name, (ct_name, ct_config) in ct_map.items():
if col_name in row_dict:
ct_class = datasette.get_column_type_class(ct_name)
if ct_class:
row_dict[col_name] = await ct_class.transform_value(
row_dict[col_name], ct_config, datasette
)
transformed_rows.append(row_dict)
data["rows"] = transformed_rows
if context_for_html_hack:
data.update(extra_context_from_filters)

View file

@ -1,7 +1,15 @@
import logging
import logging
from datasette.app import Datasette
from datasette.column_types import ColumnType
from datasette.hookspecs import hookimpl
from datasette.plugins import pm
from datasette.utils import sqlite3
from datasette.utils import StartupError
import json
import markupsafe
import pytest
import time
@ -367,3 +375,343 @@ async def test_render_cell_extra_with_column_types(ds_ct):
rendered = data["render_cell"][0]
assert "mailto:" in rendered["author_email"]
assert "href" in rendered["website"]
# --- Duplicate column type name ---
@pytest.mark.asyncio
async def test_duplicate_column_type_name_raises_error():
class DuplicateUrlType(ColumnType):
async def render_cell(self, value, column, table, database, datasette, request, config):
return None
class _Plugin:
@hookimpl
def register_column_types(self, datasette):
return [DuplicateUrlType(name="url", description="Duplicate URL")]
plugin = _Plugin()
pm.register(plugin, name="test_duplicate_ct")
try:
ds = Datasette()
with pytest.raises(StartupError, match="Duplicate column type name: url"):
await ds.invoke_startup()
finally:
pm.unregister(plugin, name="test_duplicate_ct")
# --- Row endpoint ---
@pytest.mark.asyncio
async def test_row_endpoint_render_cell_with_column_types(ds_ct):
await ds_ct.invoke_startup()
response = await ds_ct.client.get("/data/posts/1.json?_extra=render_cell")
assert response.status_code == 200
data = response.json()
rendered = data["render_cell"][0]
assert "mailto:" in rendered["author_email"]
assert "href" in rendered["website"]
# --- transform_value in JSON output ---
@pytest.mark.asyncio
async def test_transform_value_in_json_output(tmp_path_factory):
"""A column type with transform_value should modify rows in JSON API."""
class UpperColumnType(ColumnType):
async def transform_value(self, value, config, datasette):
if isinstance(value, str):
return value.upper()
return value
class _Plugin:
@hookimpl
def register_column_types(self, datasette):
return [UpperColumnType(name="upper", description="Uppercase")]
plugin = _Plugin()
pm.register(plugin, name="test_transform_ct")
try:
db_directory = tmp_path_factory.mktemp("dbs")
db_path = str(db_directory / "data.db")
db = sqlite3.connect(str(db_path))
db.execute("vacuum")
db.execute("create table t (id integer primary key, name text)")
db.execute("insert into t values (1, 'hello')")
db.commit()
ds = Datasette(
[db_path],
config={
"databases": {
"data": {
"tables": {
"t": {
"column_types": {"name": "upper"}
}
}
}
}
},
)
await ds.invoke_startup()
response = await ds.client.get("/data/t.json")
assert response.status_code == 200
data = response.json()
assert data["rows"][0]["name"] == "HELLO"
db.close()
for database in ds.databases.values():
if not database.is_memory:
database.close()
finally:
pm.unregister(plugin, name="test_transform_ct")
# --- Column type priority over plugins ---
@pytest.mark.asyncio
async def test_column_type_render_cell_has_priority_over_plugins(tmp_path_factory):
"""Column type render_cell should take priority over render_cell plugin hook."""
class PriorityColumnType(ColumnType):
async def render_cell(self, value, column, table, database, datasette, request, config):
if value is not None:
return markupsafe.Markup(f"<b>COLUMN_TYPE:{markupsafe.escape(value)}</b>")
return None
class _ColumnTypePlugin:
@hookimpl
def register_column_types(self, datasette):
return [PriorityColumnType(name="priority_test", description="Priority test")]
class _RenderCellPlugin:
@hookimpl
def render_cell(self, row, value, column, table, pks, database, datasette, request,
column_type, column_type_config):
if column == "name":
return markupsafe.Markup(f"<i>PLUGIN:{markupsafe.escape(value)}</i>")
ct_plugin = _ColumnTypePlugin()
rc_plugin = _RenderCellPlugin()
pm.register(ct_plugin, name="test_priority_ct")
pm.register(rc_plugin, name="test_priority_render")
try:
db_directory = tmp_path_factory.mktemp("dbs")
db_path = str(db_directory / "data.db")
db = sqlite3.connect(str(db_path))
db.execute("vacuum")
db.execute("create table t (id integer primary key, name text)")
db.execute("insert into t values (1, 'hello')")
db.commit()
ds = Datasette(
[db_path],
config={
"databases": {
"data": {
"tables": {
"t": {
"column_types": {"name": "priority_test"}
}
}
}
}
},
)
await ds.invoke_startup()
response = await ds.client.get("/data/t.json?_extra=render_cell")
assert response.status_code == 200
data = response.json()
rendered = data["render_cell"][0]
# Column type should win over the plugin
assert "COLUMN_TYPE:" in rendered["name"]
assert "PLUGIN:" not in rendered["name"]
db.close()
for database in ds.databases.values():
if not database.is_memory:
database.close()
finally:
pm.unregister(ct_plugin, name="test_priority_ct")
pm.unregister(rc_plugin, name="test_priority_render")
# --- Row detail page rendering ---
@pytest.mark.asyncio
async def test_row_detail_page_html_rendering(ds_ct):
"""Row detail HTML page should use column type rendering."""
await ds_ct.invoke_startup()
response = await ds_ct.client.get("/data/posts/1")
assert response.status_code == 200
html = response.text
# The email column should be rendered with mailto: link
assert "mailto:test@example.com" in html
# The url column should be rendered with href
assert 'href="https://example.com"' in html
# --- HTML table page rendering ---
@pytest.mark.asyncio
async def test_html_table_page_rendering(ds_ct):
"""HTML table page should use column type rendering."""
await ds_ct.invoke_startup()
response = await ds_ct.client.get("/data/posts")
assert response.status_code == 200
html = response.text
assert "mailto:test@example.com" in html
assert 'href="https://example.com"' in html
# --- Validation on upsert ---
@pytest.mark.asyncio
async def test_validation_on_upsert(ds_ct):
await ds_ct.invoke_startup()
token = write_token(ds_ct)
response = await ds_ct.client.post(
"/data/posts/-/upsert",
json={
"rows": [{"id": 1, "title": "Updated", "author_email": "invalid"}],
},
headers=_headers(token),
)
assert response.status_code == 400
assert "author_email" in response.json()["errors"][0]
@pytest.mark.asyncio
async def test_validation_on_upsert_passes_valid(ds_ct):
await ds_ct.invoke_startup()
token = write_token(ds_ct)
response = await ds_ct.client.post(
"/data/posts/-/upsert",
json={
"rows": [{"id": 1, "title": "Updated", "author_email": "valid@test.com"}],
},
headers=_headers(token),
)
assert response.status_code == 200
# --- Unknown type warning logged ---
@pytest.mark.asyncio
async def test_unknown_type_warning_logged(tmp_path_factory, caplog):
db_directory = tmp_path_factory.mktemp("dbs")
db_path = str(db_directory / "data.db")
db = sqlite3.connect(str(db_path))
db.execute("vacuum")
db.execute("create table t (id integer primary key, col text)")
db.commit()
ds = Datasette(
[db_path],
config={
"databases": {
"data": {
"tables": {
"t": {
"column_types": {"col": "nonexistent_type"}
}
}
}
}
},
)
with caplog.at_level(logging.WARNING):
await ds.invoke_startup()
assert "unknown type" in caplog.text.lower()
assert "nonexistent_type" in caplog.text
db.close()
for database in ds.databases.values():
if not database.is_memory:
database.close()
# --- Config overwrites on restart ---
@pytest.mark.asyncio
async def test_config_overwrites_on_restart(tmp_path_factory):
"""Config values should overwrite any existing column types in internal DB on startup."""
db_directory = tmp_path_factory.mktemp("dbs")
db_path = str(db_directory / "data.db")
db = sqlite3.connect(str(db_path))
db.execute("vacuum")
db.execute("create table t (id integer primary key, col text)")
db.commit()
ds = Datasette(
[db_path],
config={
"databases": {
"data": {
"tables": {
"t": {
"column_types": {"col": "email"}
}
}
}
}
},
)
await ds.invoke_startup()
ct, _ = await ds.get_column_type("data", "t", "col")
assert ct == "email"
# Manually change the column type in the internal DB
await ds.set_column_type("data", "t", "col", "url")
ct, _ = await ds.get_column_type("data", "t", "col")
assert ct == "url"
# Re-apply config (simulating what happens on restart)
await ds._apply_column_types_config()
ct, _ = await ds.get_column_type("data", "t", "col")
assert ct == "email" # Config wins
db.close()
for database in ds.databases.values():
if not database.is_memory:
database.close()
# --- No column_types in config ---
@pytest.mark.asyncio
async def test_no_column_types_in_config(tmp_path_factory):
"""Datasette should work fine without any column_types configuration."""
db_directory = tmp_path_factory.mktemp("dbs")
db_path = str(db_directory / "data.db")
db = sqlite3.connect(str(db_path))
db.execute("vacuum")
db.execute("create table t (id integer primary key, col text)")
db.execute("insert into t values (1, 'hello')")
db.commit()
ds = Datasette([db_path])
await ds.invoke_startup()
# No column types assigned
ct_map = await ds.get_column_types("data", "t")
assert ct_map == {}
# JSON endpoint should work without column_types extra
response = await ds.client.get("/data/t.json")
assert response.status_code == 200
assert response.json()["rows"][0]["col"] == "hello"
# column_types extra should return empty
response = await ds.client.get("/data/t.json?_extra=column_types")
assert response.status_code == 200
assert response.json()["column_types"] == {}
db.close()
for database in ds.databases.values():
if not database.is_memory:
database.close()