From e8472bc0cde0b2186587c7739e0722e459eb270f Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 17 Mar 2026 02:48:55 +0000 Subject: [PATCH] Add missing tests and transform_value integration - Add transform_value integration in table JSON endpoint rows - Add tests for: duplicate type name error, row endpoint rendering, transform_value in JSON output, column type priority over plugins, row detail HTML rendering, table HTML rendering, upsert validation, unknown type warning logging, config overwrite on restart, and no-config edge case - Total: 34 column type tests, all passing https://claude.ai/code/session_01SvPEPqHgURTWESRp28pTC3 --- datasette/views/table.py | 15 +- tests/test_column_types.py | 348 +++++++++++++++++++++++++++++++++++++ 2 files changed, 362 insertions(+), 1 deletion(-) diff --git a/datasette/views/table.py b/datasette/views/table.py index 3c9b6656..20d78164 100644 --- a/datasette/views/table.py +++ b/datasette/views/table.py @@ -1851,7 +1851,20 @@ async def table_view_data( } ) raw_sqlite_rows = rows[:page_size] - data["rows"] = [dict(r) for r in raw_sqlite_rows] + # Apply transform_value for columns with assigned types + ct_map = await datasette.get_column_types(database_name, table_name) + transformed_rows = [] + for r in raw_sqlite_rows: + row_dict = dict(r) + for col_name, (ct_name, ct_config) in ct_map.items(): + if col_name in row_dict: + ct_class = datasette.get_column_type_class(ct_name) + if ct_class: + row_dict[col_name] = await ct_class.transform_value( + row_dict[col_name], ct_config, datasette + ) + transformed_rows.append(row_dict) + data["rows"] = transformed_rows if context_for_html_hack: data.update(extra_context_from_filters) diff --git a/tests/test_column_types.py b/tests/test_column_types.py index 3cbadf5e..efb8fbc7 100644 --- a/tests/test_column_types.py +++ b/tests/test_column_types.py @@ -1,7 +1,15 @@ +import logging + +import logging + from datasette.app import Datasette from datasette.column_types import ColumnType +from datasette.hookspecs import hookimpl +from datasette.plugins import pm from datasette.utils import sqlite3 +from datasette.utils import StartupError import json +import markupsafe import pytest import time @@ -367,3 +375,343 @@ async def test_render_cell_extra_with_column_types(ds_ct): rendered = data["render_cell"][0] assert "mailto:" in rendered["author_email"] assert "href" in rendered["website"] + + +# --- Duplicate column type name --- + + +@pytest.mark.asyncio +async def test_duplicate_column_type_name_raises_error(): + class DuplicateUrlType(ColumnType): + async def render_cell(self, value, column, table, database, datasette, request, config): + return None + + class _Plugin: + @hookimpl + def register_column_types(self, datasette): + return [DuplicateUrlType(name="url", description="Duplicate URL")] + + plugin = _Plugin() + pm.register(plugin, name="test_duplicate_ct") + try: + ds = Datasette() + with pytest.raises(StartupError, match="Duplicate column type name: url"): + await ds.invoke_startup() + finally: + pm.unregister(plugin, name="test_duplicate_ct") + + +# --- Row endpoint --- + + +@pytest.mark.asyncio +async def test_row_endpoint_render_cell_with_column_types(ds_ct): + await ds_ct.invoke_startup() + response = await ds_ct.client.get("/data/posts/1.json?_extra=render_cell") + assert response.status_code == 200 + data = response.json() + rendered = data["render_cell"][0] + assert "mailto:" in rendered["author_email"] + assert "href" in rendered["website"] + + +# --- transform_value in JSON output --- + + +@pytest.mark.asyncio +async def test_transform_value_in_json_output(tmp_path_factory): + """A column type with transform_value should modify rows in JSON API.""" + + class UpperColumnType(ColumnType): + async def transform_value(self, value, config, datasette): + if isinstance(value, str): + return value.upper() + return value + + class _Plugin: + @hookimpl + def register_column_types(self, datasette): + return [UpperColumnType(name="upper", description="Uppercase")] + + plugin = _Plugin() + pm.register(plugin, name="test_transform_ct") + try: + db_directory = tmp_path_factory.mktemp("dbs") + db_path = str(db_directory / "data.db") + db = sqlite3.connect(str(db_path)) + db.execute("vacuum") + db.execute("create table t (id integer primary key, name text)") + db.execute("insert into t values (1, 'hello')") + db.commit() + ds = Datasette( + [db_path], + config={ + "databases": { + "data": { + "tables": { + "t": { + "column_types": {"name": "upper"} + } + } + } + } + }, + ) + await ds.invoke_startup() + response = await ds.client.get("/data/t.json") + assert response.status_code == 200 + data = response.json() + assert data["rows"][0]["name"] == "HELLO" + db.close() + for database in ds.databases.values(): + if not database.is_memory: + database.close() + finally: + pm.unregister(plugin, name="test_transform_ct") + + +# --- Column type priority over plugins --- + + +@pytest.mark.asyncio +async def test_column_type_render_cell_has_priority_over_plugins(tmp_path_factory): + """Column type render_cell should take priority over render_cell plugin hook.""" + + class PriorityColumnType(ColumnType): + async def render_cell(self, value, column, table, database, datasette, request, config): + if value is not None: + return markupsafe.Markup(f"COLUMN_TYPE:{markupsafe.escape(value)}") + return None + + class _ColumnTypePlugin: + @hookimpl + def register_column_types(self, datasette): + return [PriorityColumnType(name="priority_test", description="Priority test")] + + class _RenderCellPlugin: + @hookimpl + def render_cell(self, row, value, column, table, pks, database, datasette, request, + column_type, column_type_config): + if column == "name": + return markupsafe.Markup(f"PLUGIN:{markupsafe.escape(value)}") + + ct_plugin = _ColumnTypePlugin() + rc_plugin = _RenderCellPlugin() + pm.register(ct_plugin, name="test_priority_ct") + pm.register(rc_plugin, name="test_priority_render") + try: + db_directory = tmp_path_factory.mktemp("dbs") + db_path = str(db_directory / "data.db") + db = sqlite3.connect(str(db_path)) + db.execute("vacuum") + db.execute("create table t (id integer primary key, name text)") + db.execute("insert into t values (1, 'hello')") + db.commit() + ds = Datasette( + [db_path], + config={ + "databases": { + "data": { + "tables": { + "t": { + "column_types": {"name": "priority_test"} + } + } + } + } + }, + ) + await ds.invoke_startup() + response = await ds.client.get("/data/t.json?_extra=render_cell") + assert response.status_code == 200 + data = response.json() + rendered = data["render_cell"][0] + # Column type should win over the plugin + assert "COLUMN_TYPE:" in rendered["name"] + assert "PLUGIN:" not in rendered["name"] + db.close() + for database in ds.databases.values(): + if not database.is_memory: + database.close() + finally: + pm.unregister(ct_plugin, name="test_priority_ct") + pm.unregister(rc_plugin, name="test_priority_render") + + +# --- Row detail page rendering --- + + +@pytest.mark.asyncio +async def test_row_detail_page_html_rendering(ds_ct): + """Row detail HTML page should use column type rendering.""" + await ds_ct.invoke_startup() + response = await ds_ct.client.get("/data/posts/1") + assert response.status_code == 200 + html = response.text + # The email column should be rendered with mailto: link + assert "mailto:test@example.com" in html + # The url column should be rendered with href + assert 'href="https://example.com"' in html + + +# --- HTML table page rendering --- + + +@pytest.mark.asyncio +async def test_html_table_page_rendering(ds_ct): + """HTML table page should use column type rendering.""" + await ds_ct.invoke_startup() + response = await ds_ct.client.get("/data/posts") + assert response.status_code == 200 + html = response.text + assert "mailto:test@example.com" in html + assert 'href="https://example.com"' in html + + +# --- Validation on upsert --- + + +@pytest.mark.asyncio +async def test_validation_on_upsert(ds_ct): + await ds_ct.invoke_startup() + token = write_token(ds_ct) + response = await ds_ct.client.post( + "/data/posts/-/upsert", + json={ + "rows": [{"id": 1, "title": "Updated", "author_email": "invalid"}], + }, + headers=_headers(token), + ) + assert response.status_code == 400 + assert "author_email" in response.json()["errors"][0] + + +@pytest.mark.asyncio +async def test_validation_on_upsert_passes_valid(ds_ct): + await ds_ct.invoke_startup() + token = write_token(ds_ct) + response = await ds_ct.client.post( + "/data/posts/-/upsert", + json={ + "rows": [{"id": 1, "title": "Updated", "author_email": "valid@test.com"}], + }, + headers=_headers(token), + ) + assert response.status_code == 200 + + +# --- Unknown type warning logged --- + + +@pytest.mark.asyncio +async def test_unknown_type_warning_logged(tmp_path_factory, caplog): + db_directory = tmp_path_factory.mktemp("dbs") + db_path = str(db_directory / "data.db") + db = sqlite3.connect(str(db_path)) + db.execute("vacuum") + db.execute("create table t (id integer primary key, col text)") + db.commit() + ds = Datasette( + [db_path], + config={ + "databases": { + "data": { + "tables": { + "t": { + "column_types": {"col": "nonexistent_type"} + } + } + } + } + }, + ) + with caplog.at_level(logging.WARNING): + await ds.invoke_startup() + assert "unknown type" in caplog.text.lower() + assert "nonexistent_type" in caplog.text + db.close() + for database in ds.databases.values(): + if not database.is_memory: + database.close() + + +# --- Config overwrites on restart --- + + +@pytest.mark.asyncio +async def test_config_overwrites_on_restart(tmp_path_factory): + """Config values should overwrite any existing column types in internal DB on startup.""" + db_directory = tmp_path_factory.mktemp("dbs") + db_path = str(db_directory / "data.db") + db = sqlite3.connect(str(db_path)) + db.execute("vacuum") + db.execute("create table t (id integer primary key, col text)") + db.commit() + ds = Datasette( + [db_path], + config={ + "databases": { + "data": { + "tables": { + "t": { + "column_types": {"col": "email"} + } + } + } + } + }, + ) + await ds.invoke_startup() + ct, _ = await ds.get_column_type("data", "t", "col") + assert ct == "email" + + # Manually change the column type in the internal DB + await ds.set_column_type("data", "t", "col", "url") + ct, _ = await ds.get_column_type("data", "t", "col") + assert ct == "url" + + # Re-apply config (simulating what happens on restart) + await ds._apply_column_types_config() + ct, _ = await ds.get_column_type("data", "t", "col") + assert ct == "email" # Config wins + + db.close() + for database in ds.databases.values(): + if not database.is_memory: + database.close() + + +# --- No column_types in config --- + + +@pytest.mark.asyncio +async def test_no_column_types_in_config(tmp_path_factory): + """Datasette should work fine without any column_types configuration.""" + db_directory = tmp_path_factory.mktemp("dbs") + db_path = str(db_directory / "data.db") + db = sqlite3.connect(str(db_path)) + db.execute("vacuum") + db.execute("create table t (id integer primary key, col text)") + db.execute("insert into t values (1, 'hello')") + db.commit() + ds = Datasette([db_path]) + await ds.invoke_startup() + + # No column types assigned + ct_map = await ds.get_column_types("data", "t") + assert ct_map == {} + + # JSON endpoint should work without column_types extra + response = await ds.client.get("/data/t.json") + assert response.status_code == 200 + assert response.json()["rows"][0]["col"] == "hello" + + # column_types extra should return empty + response = await ds.client.get("/data/t.json?_extra=column_types") + assert response.status_code == 200 + assert response.json()["column_types"] == {} + + db.close() + for database in ds.databases.values(): + if not database.is_memory: + database.close()