Add column types system for semantic column annotations

Implements the column types feature that lets Datasette and plugins annotate
columns with semantic types beyond SQLite storage types (e.g. markdown, email,
url, json, file, point). This enables type-appropriate rendering, validation,
form widgets, and API behavior.

Key changes:
- New `column_types` internal DB table for storing assignments
- `ColumnType` dataclass in datasette/column_types.py with render_cell,
  validate, and transform_value methods
- `register_column_types` plugin hook for registering types
- Built-in url, email, and json column types
- Datasette API methods: get/set/remove_column_type(s),
  get_column_type_class
- Config loading from datasette.json `column_types` table config key
- `column_types` extra on the table JSON endpoint
- Column type info in display_columns extra
- Column type render_cell gets priority in rendering pipeline
- column_type/column_type_config args added to render_cell hookspec
- Write-path validation on insert and update

https://claude.ai/code/session_01SvPEPqHgURTWESRp28pTC3
This commit is contained in:
Claude 2026-03-17 02:40:37 +00:00
commit 73225ccad0
No known key found for this signature in database
11 changed files with 781 additions and 58 deletions

View file

@ -354,6 +354,7 @@ class Datasette:
self.immutables = set(immutables or [])
self.databases = collections.OrderedDict()
self.actions = {} # .invoke_startup() will populate this
self._column_types = {} # .invoke_startup() will populate this
try:
self._refresh_schemas_lock = asyncio.Lock()
except RuntimeError as rex:
@ -692,12 +693,25 @@ class Datasette:
action_abbrs[action.abbr] = action
self.actions[action.name] = action
# Register column types
self._column_types = {}
for hook in pm.hook.register_column_types(datasette=self):
if hook:
for ct in hook:
if ct.name in self._column_types:
raise StartupError(
f"Duplicate column type name: {ct.name}"
)
self._column_types[ct.name] = ct
for hook in pm.hook.prepare_jinja2_environment(
env=self._jinja_env, datasette=self
):
await await_me_maybe(hook)
# Ensure internal tables and metadata are populated before startup hooks
await self._refresh_schemas()
# Load column_types from config into internal DB
await self._apply_column_types_config()
for hook in pm.hook.startup(datasette=self):
await await_me_maybe(hook)
self._startup_invoked = True
@ -945,6 +959,95 @@ class Datasette:
[database_name, resource_name, column_name, key, value],
)
# Column types API
async def _apply_column_types_config(self):
"""Load column_types from datasette.json config into the internal DB."""
import logging
for db_name, db_conf in (self.config or {}).get("databases", {}).items():
for table_name, table_conf in db_conf.get("tables", {}).items():
for col_name, ct in table_conf.get("column_types", {}).items():
if isinstance(ct, str):
col_type, config = ct, None
else:
col_type = ct["type"]
config = ct.get("config")
if col_type not in self._column_types:
logging.warning(
"column_types config references unknown type %r "
"for %s.%s.%s",
col_type, db_name, table_name, col_name,
)
await self.set_column_type(
db_name, table_name, col_name, col_type, config
)
async def get_column_type(
self, database: str, resource: str, column: str
) -> tuple:
"""
Return (column_type_name, config_dict) for a specific column,
or (None, None) if no column type is assigned.
"""
row = await self.get_internal_database().execute(
"SELECT column_type, config FROM column_types "
"WHERE database_name = ? AND resource_name = ? AND column_name = ?",
[database, resource, column],
)
rows = row.rows
if not rows:
return None, None
ct, config = rows[0]
return (ct, json.loads(config) if config else None)
async def get_column_types(
self, database: str, resource: str
) -> dict:
"""
Return {column_name: (column_type_name, config_dict_or_None)}
for all columns with assigned types on the given resource.
"""
rows = await self.get_internal_database().execute(
"SELECT column_name, column_type, config FROM column_types "
"WHERE database_name = ? AND resource_name = ?",
[database, resource],
)
return {
row[0]: (row[1], json.loads(row[2]) if row[2] else None)
for row in rows.rows
}
async def set_column_type(
self, database: str, resource: str, column: str,
column_type: str, config: dict = None
) -> None:
"""Assign a column type. Overwrites any existing assignment."""
await self.get_internal_database().execute_write(
"""INSERT OR REPLACE INTO column_types
(database_name, resource_name, column_name, column_type, config)
VALUES (?, ?, ?, ?, ?)""",
[database, resource, column, column_type,
json.dumps(config) if config else None],
)
async def remove_column_type(
self, database: str, resource: str, column: str
) -> None:
"""Remove a column type assignment."""
await self.get_internal_database().execute_write(
"DELETE FROM column_types "
"WHERE database_name = ? AND resource_name = ? AND column_name = ?",
[database, resource, column],
)
def get_column_type_class(self, column_type_name: str):
"""
Return the registered ColumnType instance for a given name,
or None if no plugin has registered that name.
"""
return self._column_types.get(column_type_name)
def get_internal_database(self):
return self._internal_database

42
datasette/column_types.py Normal file
View file

@ -0,0 +1,42 @@
from dataclasses import dataclass
@dataclass(frozen=True, kw_only=True)
class ColumnType:
name: str
"""
Unique identifier string. Lowercase, no spaces.
Examples: "markdown", "file", "email", "url", "point", "image".
"""
description: str
"""
Human-readable label for admin UI dropdowns.
Examples: "Markdown text", "File reference", "Email address".
"""
async def render_cell(
self, value, column, table, database, datasette, request, config
):
"""
Return an HTML string to render this cell value, or None to
fall through to the default render_cell plugin hook chain.
``config`` is the parsed JSON config dict for this specific
column assignment, or None.
"""
return None
async def validate(self, value, config, datasette):
"""
Validate a value before it is written. Return None if valid,
or a string error message if invalid.
"""
return None
async def transform_value(self, value, config, datasette):
"""
Transform a value before it appears in JSON API output.
Return the transformed value. Default: return unchanged.
"""
return value

View file

@ -0,0 +1,82 @@
import json
import re
import markupsafe
from datasette import hookimpl
from datasette.column_types import ColumnType
class UrlColumnType(ColumnType):
async def render_cell(
self, value, column, table, database, datasette, request, config
):
if not value or not isinstance(value, str):
return None
escaped = markupsafe.escape(value.strip())
return markupsafe.Markup(f'<a href="{escaped}">{escaped}</a>')
async def validate(self, value, config, datasette):
if value is None or value == "":
return None
if not isinstance(value, str):
return "URL must be a string"
if not re.match(r"^https?://\S+$", value.strip()):
return "Invalid URL"
return None
class EmailColumnType(ColumnType):
async def render_cell(
self, value, column, table, database, datasette, request, config
):
if not value or not isinstance(value, str):
return None
escaped = markupsafe.escape(value.strip())
return markupsafe.Markup(f'<a href="mailto:{escaped}">{escaped}</a>')
async def validate(self, value, config, datasette):
if value is None or value == "":
return None
if not isinstance(value, str):
return "Email must be a string"
if not re.match(r"^[^@\s]+@[^@\s]+\.[^@\s]+$", value.strip()):
return "Invalid email address"
return None
class JsonColumnType(ColumnType):
async def render_cell(
self, value, column, table, database, datasette, request, config
):
if value is None:
return None
try:
parsed = json.loads(value) if isinstance(value, str) else value
formatted = json.dumps(parsed, indent=2)
escaped = markupsafe.escape(formatted)
return markupsafe.Markup(f"<pre>{escaped}</pre>")
except (json.JSONDecodeError, TypeError):
return None
async def validate(self, value, config, datasette):
if value is None or value == "":
return None
if isinstance(value, str):
try:
json.loads(value)
except json.JSONDecodeError:
return "Invalid JSON"
return None
@hookimpl
def register_column_types(datasette):
return [
UrlColumnType(name="url", description="URL"),
EmailColumnType(name="email", description="Email address"),
JsonColumnType(name="json", description="JSON data"),
]

View file

@ -55,7 +55,10 @@ def publish_subcommand(publish):
@hookspec
def render_cell(row, value, column, table, pks, database, datasette, request):
def render_cell(
row, value, column, table, pks, database, datasette, request,
column_type, column_type_config
):
"""Customize rendering of HTML table cell values"""
@ -74,6 +77,11 @@ def register_actions(datasette):
"""Register actions: returns a list of datasette.permission.Action objects"""
@hookspec
def register_column_types(datasette):
"""Return a list of ColumnType instances"""
@hookspec
def register_routes(datasette):
"""Register URL routes: return a list of (regex, view_function) pairs"""

View file

@ -25,6 +25,7 @@ DEFAULT_PLUGINS = (
"datasette.default_permissions",
"datasette.default_permissions.tokens",
"datasette.default_actions",
"datasette.default_column_types",
"datasette.default_magic_parameters",
"datasette.blob_renderer",
"datasette.default_menu_links",

View file

@ -103,6 +103,15 @@ async def initialize_metadata_tables(db):
value text,
unique(database_name, resource_name, column_name, key)
);
CREATE TABLE IF NOT EXISTS column_types (
database_name TEXT,
resource_name TEXT,
column_name TEXT,
column_type TEXT NOT NULL,
config TEXT,
PRIMARY KEY (database_name, resource_name, column_name)
);
"""))

View file

@ -1205,6 +1205,8 @@ async def display_rows(datasette, database, request, rows, columns):
database=database,
datasette=datasette,
request=request,
column_type=None,
column_type_config=None,
):
candidate = await await_me_maybe(candidate)
if candidate is not None:

View file

@ -179,26 +179,38 @@ class RowView(DataView):
if "render_cell" in extras:
# Call render_cell plugin hook for each cell
ct_map = await self.ds.get_column_types(database, table)
rendered_rows = []
for row in rows:
rendered_row = {}
for value, column in zip(row, columns):
# Call render_cell plugin hook
ct_info = ct_map.get(column)
ct_name = ct_info[0] if ct_info else None
ct_config = ct_info[1] if ct_info else None
plugin_display_value = None
for candidate in pm.hook.render_cell(
row=row,
value=value,
column=column,
table=table,
pks=resolved.pks,
database=database,
datasette=self.ds,
request=request,
):
candidate = await await_me_maybe(candidate)
if candidate is not None:
plugin_display_value = candidate
break
# Try column type render_cell first
if ct_name:
ct_class = self.ds.get_column_type_class(ct_name)
if ct_class:
candidate = await ct_class.render_cell(
value=value, column=column, table=table,
database=database, datasette=self.ds,
request=request, config=ct_config,
)
if candidate is not None:
plugin_display_value = candidate
if plugin_display_value is None:
for candidate in pm.hook.render_cell(
row=row, value=value, column=column,
table=table, pks=resolved.pks,
database=database, datasette=self.ds,
request=request, column_type=ct_name,
column_type_config=ct_config,
):
candidate = await await_me_maybe(candidate)
if candidate is not None:
plugin_display_value = candidate
break
if plugin_display_value:
rendered_row[column] = str(plugin_display_value)
rendered_rows.append(rendered_row)
@ -352,6 +364,14 @@ class RowUpdateView(BaseView):
update = data["update"]
# Validate column types
from datasette.views.table import _validate_column_types
ct_errors = await _validate_column_types(
self.ds, resolved.db.name, resolved.table, [update]
)
if ct_errors:
return _error(ct_errors, 400)
alter = data.get("alter")
if alter and not await self.ds.allowed(
action="alter-table",

View file

@ -134,6 +134,25 @@ async def _redirect_if_needed(datasette, request, resolved):
)
async def _validate_column_types(datasette, database_name, table_name, rows):
"""Validate row values against assigned column types. Returns list of error strings."""
ct_map = await datasette.get_column_types(database_name, table_name)
if not ct_map:
return []
errors = []
for row in rows:
for col_name, (ct_name, ct_config) in ct_map.items():
if col_name not in row:
continue
ct_class = datasette.get_column_type_class(ct_name)
if ct_class is None:
continue
error = await ct_class.validate(row[col_name], ct_config, datasette)
if error:
errors.append(f"{col_name}: {error}")
return errors
async def display_columns_and_rows(
datasette,
database_name,
@ -163,6 +182,9 @@ async def display_columns_and_rows(
)
)
# Look up column types for this table
column_types_map = await datasette.get_column_types(database_name, table_name)
column_details = {
col.name: col for col in await db.table_column_details(table_name)
}
@ -179,16 +201,22 @@ async def display_columns_and_rows(
else:
type_ = column_details[r[0]].type
notnull = column_details[r[0]].notnull
columns.append(
{
"name": r[0],
"sortable": r[0] in sortable_columns,
"is_pk": r[0] in pks_for_display,
"type": type_,
"notnull": notnull,
"description": column_descriptions.get(r[0]),
}
)
col_dict = {
"name": r[0],
"sortable": r[0] in sortable_columns,
"is_pk": r[0] in pks_for_display,
"type": type_,
"notnull": notnull,
"description": column_descriptions.get(r[0]),
}
ct_info = column_types_map.get(r[0])
if ct_info:
col_dict["column_type"] = ct_info[0]
col_dict["column_type_config"] = ct_info[1]
else:
col_dict["column_type"] = None
col_dict["column_type_config"] = None
columns.append(col_dict)
column_to_foreign_key_table = {
fk["column"]: fk["other_table"]
@ -227,23 +255,42 @@ async def display_columns_and_rows(
# already shown in the link column.
continue
# First let the plugins have a go
# First try column type render_cell, then plugins
# pylint: disable=no-member
plugin_display_value = None
for candidate in pm.hook.render_cell(
row=row,
value=value,
column=column,
table=table_name,
pks=pks_for_display,
database=database_name,
datasette=datasette,
request=request,
):
candidate = await await_me_maybe(candidate)
if candidate is not None:
plugin_display_value = candidate
break
ct_name = column_dict.get("column_type")
ct_config = column_dict.get("column_type_config")
if ct_name:
ct_class = datasette.get_column_type_class(ct_name)
if ct_class:
candidate = await ct_class.render_cell(
value=value,
column=column,
table=table_name,
database=database_name,
datasette=datasette,
request=request,
config=ct_config,
)
if candidate is not None:
plugin_display_value = candidate
if plugin_display_value is None:
for candidate in pm.hook.render_cell(
row=row,
value=value,
column=column,
table=table_name,
pks=pks_for_display,
database=database_name,
datasette=datasette,
request=request,
column_type=ct_name,
column_type_config=ct_config,
):
candidate = await await_me_maybe(candidate)
if candidate is not None:
plugin_display_value = candidate
break
if plugin_display_value:
display_value = plugin_display_value
elif isinstance(value, bytes):
@ -484,6 +531,11 @@ class TableInsertView(BaseView):
if errors:
return _error(errors, 400)
# Validate column types
ct_errors = await _validate_column_types(self.ds, database_name, table_name, rows)
if ct_errors:
return _error(ct_errors, 400)
num_rows = len(rows)
# No that we've passed pks to _validate_data it's safe to
@ -1500,27 +1552,39 @@ async def table_view_data(
async def extra_render_cell():
"Rendered HTML for each cell using the render_cell plugin hook"
pks_for_display = pks if pks else (["rowid"] if not is_view else [])
columns = [col[0] for col in results.description]
col_names = [col[0] for col in results.description]
ct_map = await datasette.get_column_types(database_name, table_name)
rendered_rows = []
for row in rows:
rendered_row = {}
for value, column in zip(row, columns):
# Call render_cell plugin hook
for value, column in zip(row, col_names):
ct_info = ct_map.get(column)
ct_name = ct_info[0] if ct_info else None
ct_config = ct_info[1] if ct_info else None
plugin_display_value = None
for candidate in pm.hook.render_cell(
row=row,
value=value,
column=column,
table=table_name,
pks=pks_for_display,
database=database_name,
datasette=datasette,
request=request,
):
candidate = await await_me_maybe(candidate)
if candidate is not None:
plugin_display_value = candidate
break
# Try column type render_cell first
if ct_name:
ct_class = datasette.get_column_type_class(ct_name)
if ct_class:
candidate = await ct_class.render_cell(
value=value, column=column, table=table_name,
database=database_name, datasette=datasette,
request=request, config=ct_config,
)
if candidate is not None:
plugin_display_value = candidate
if plugin_display_value is None:
for candidate in pm.hook.render_cell(
row=row, value=value, column=column,
table=table_name, pks=pks_for_display,
database=database_name, datasette=datasette,
request=request, column_type=ct_name,
column_type_config=ct_config,
):
candidate = await await_me_maybe(candidate)
if candidate is not None:
plugin_display_value = candidate
break
if plugin_display_value:
rendered_row[column] = str(plugin_display_value)
rendered_rows.append(rendered_row)
@ -1533,6 +1597,17 @@ async def table_view_data(
"params": params,
}
async def extra_column_types():
"Column type assignments for this table"
ct_map = await datasette.get_column_types(database_name, table_name)
return {
col_name: {
"type": ct_name,
"config": ct_config,
}
for col_name, (ct_name, ct_config) in ct_map.items()
}
async def extra_metadata():
"Metadata about the table and database"
tablemetadata = await datasette.get_resource_metadata(database_name, table_name)
@ -1742,6 +1817,7 @@ async def table_view_data(
extra_debug,
extra_request,
extra_query,
extra_column_types,
extra_metadata,
extra_extras,
extra_database,

369
tests/test_column_types.py Normal file
View file

@ -0,0 +1,369 @@
from datasette.app import Datasette
from datasette.column_types import ColumnType
from datasette.utils import sqlite3
import json
import pytest
import time
@pytest.fixture
def ds_ct(tmp_path_factory):
db_directory = tmp_path_factory.mktemp("dbs")
db_path = str(db_directory / "data.db")
db = sqlite3.connect(str(db_path))
db.execute("vacuum")
db.execute(
"create table posts (id integer primary key, title text, body text, "
"author_email text, website text, metadata text)"
)
db.execute(
"insert into posts values (1, 'Hello', '# World', 'test@example.com', "
"'https://example.com', '{\"key\": \"value\"}')"
)
db.commit()
ds = Datasette(
[db_path],
config={
"databases": {
"data": {
"tables": {
"posts": {
"column_types": {
"body": "markdown",
"author_email": "email",
"website": "url",
"metadata": "json",
}
}
}
}
}
},
)
ds.root_enabled = True
yield ds
db.close()
for database in ds.databases.values():
if not database.is_memory:
database.close()
def write_token(ds, actor_id="root", permissions=None):
to_sign = {"a": actor_id, "token": "dstok", "t": int(time.time())}
if permissions:
to_sign["_r"] = {"a": permissions}
return "dstok_{}".format(ds.sign(to_sign, namespace="token"))
def _headers(token):
return {
"Authorization": "Bearer {}".format(token),
"Content-Type": "application/json",
}
# --- Internal DB and config loading ---
@pytest.mark.asyncio
async def test_column_types_table_created(ds_ct):
await ds_ct.invoke_startup()
internal = ds_ct.get_internal_database()
result = await internal.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='column_types'"
)
assert len(result.rows) == 1
@pytest.mark.asyncio
async def test_config_loaded_into_internal_db(ds_ct):
await ds_ct.invoke_startup()
ct_map = await ds_ct.get_column_types("data", "posts")
assert "body" in ct_map
assert ct_map["body"] == ("markdown", None)
assert ct_map["author_email"] == ("email", None)
assert ct_map["website"] == ("url", None)
assert ct_map["metadata"] == ("json", None)
@pytest.mark.asyncio
async def test_config_with_type_and_config(tmp_path_factory):
db_directory = tmp_path_factory.mktemp("dbs")
db_path = str(db_directory / "data.db")
db = sqlite3.connect(str(db_path))
db.execute("vacuum")
db.execute("create table geo (id integer primary key, location text)")
ds = Datasette(
[db_path],
config={
"databases": {
"data": {
"tables": {
"geo": {
"column_types": {
"location": {
"type": "point",
"config": {"srid": 4326},
}
}
}
}
}
}
},
)
await ds.invoke_startup()
ct, config = await ds.get_column_type("data", "geo", "location")
assert ct == "point"
assert config == {"srid": 4326}
db.close()
for database in ds.databases.values():
if not database.is_memory:
database.close()
# --- Datasette API methods ---
@pytest.mark.asyncio
async def test_get_column_type(ds_ct):
await ds_ct.invoke_startup()
ct, config = await ds_ct.get_column_type("data", "posts", "author_email")
assert ct == "email"
assert config is None
@pytest.mark.asyncio
async def test_get_column_type_missing(ds_ct):
await ds_ct.invoke_startup()
ct, config = await ds_ct.get_column_type("data", "posts", "title")
assert ct is None
assert config is None
@pytest.mark.asyncio
async def test_set_and_remove_column_type(ds_ct):
await ds_ct.invoke_startup()
await ds_ct.set_column_type("data", "posts", "title", "markdown")
ct, config = await ds_ct.get_column_type("data", "posts", "title")
assert ct == "markdown"
assert config is None
await ds_ct.remove_column_type("data", "posts", "title")
ct, config = await ds_ct.get_column_type("data", "posts", "title")
assert ct is None
@pytest.mark.asyncio
async def test_set_column_type_with_config(ds_ct):
await ds_ct.invoke_startup()
await ds_ct.set_column_type("data", "posts", "title", "file", {"accept": "image/*"})
ct, config = await ds_ct.get_column_type("data", "posts", "title")
assert ct == "file"
assert config == {"accept": "image/*"}
# --- Plugin registration ---
@pytest.mark.asyncio
async def test_builtin_column_types_registered(ds_ct):
await ds_ct.invoke_startup()
assert ds_ct.get_column_type_class("url") is not None
assert ds_ct.get_column_type_class("email") is not None
assert ds_ct.get_column_type_class("json") is not None
assert ds_ct.get_column_type_class("nonexistent") is None
@pytest.mark.asyncio
async def test_column_type_class_attributes(ds_ct):
await ds_ct.invoke_startup()
url_type = ds_ct.get_column_type_class("url")
assert url_type.name == "url"
assert url_type.description == "URL"
email_type = ds_ct.get_column_type_class("email")
assert email_type.name == "email"
assert email_type.description == "Email address"
# --- JSON API ---
@pytest.mark.asyncio
async def test_column_types_extra(ds_ct):
await ds_ct.invoke_startup()
response = await ds_ct.client.get("/data/posts.json?_extra=column_types")
assert response.status_code == 200
data = response.json()
assert "column_types" in data
assert data["column_types"]["body"] == {"type": "markdown", "config": None}
assert data["column_types"]["author_email"] == {"type": "email", "config": None}
assert data["column_types"]["website"] == {"type": "url", "config": None}
# title has no column type, should not appear
assert "title" not in data["column_types"]
@pytest.mark.asyncio
async def test_display_columns_include_column_type(ds_ct):
await ds_ct.invoke_startup()
response = await ds_ct.client.get("/data/posts.json?_extra=display_columns")
assert response.status_code == 200
data = response.json()
cols = {c["name"]: c for c in data["display_columns"]}
assert cols["author_email"]["column_type"] == "email"
assert cols["author_email"]["column_type_config"] is None
assert cols["website"]["column_type"] == "url"
assert cols["title"]["column_type"] is None
# --- Rendering ---
@pytest.mark.asyncio
async def test_url_render_cell(ds_ct):
await ds_ct.invoke_startup()
response = await ds_ct.client.get("/data/posts.json?_extra=render_cell")
assert response.status_code == 200
data = response.json()
rendered = data["render_cell"][0]
assert "href" in rendered["website"]
assert "https://example.com" in rendered["website"]
@pytest.mark.asyncio
async def test_email_render_cell(ds_ct):
await ds_ct.invoke_startup()
response = await ds_ct.client.get("/data/posts.json?_extra=render_cell")
assert response.status_code == 200
data = response.json()
rendered = data["render_cell"][0]
assert "mailto:" in rendered["author_email"]
assert "test@example.com" in rendered["author_email"]
@pytest.mark.asyncio
async def test_json_render_cell(ds_ct):
await ds_ct.invoke_startup()
response = await ds_ct.client.get("/data/posts.json?_extra=render_cell")
assert response.status_code == 200
data = response.json()
rendered = data["render_cell"][0]
assert "<pre>" in rendered["metadata"]
# --- Validation ---
@pytest.mark.asyncio
async def test_email_validation_on_insert(ds_ct):
await ds_ct.invoke_startup()
token = write_token(ds_ct)
response = await ds_ct.client.post(
"/data/posts/-/insert",
json={"row": {"title": "Test", "author_email": "not-an-email"}},
headers=_headers(token),
)
assert response.status_code == 400
assert "author_email" in response.json()["errors"][0]
@pytest.mark.asyncio
async def test_email_validation_passes_valid(ds_ct):
await ds_ct.invoke_startup()
token = write_token(ds_ct)
response = await ds_ct.client.post(
"/data/posts/-/insert",
json={"row": {"title": "Test", "author_email": "valid@example.com"}},
headers=_headers(token),
)
assert response.status_code == 201
@pytest.mark.asyncio
async def test_url_validation_on_insert(ds_ct):
await ds_ct.invoke_startup()
token = write_token(ds_ct)
response = await ds_ct.client.post(
"/data/posts/-/insert",
json={"row": {"title": "Test", "website": "not-a-url"}},
headers=_headers(token),
)
assert response.status_code == 400
assert "website" in response.json()["errors"][0]
@pytest.mark.asyncio
async def test_json_validation_on_insert(ds_ct):
await ds_ct.invoke_startup()
token = write_token(ds_ct)
response = await ds_ct.client.post(
"/data/posts/-/insert",
json={"row": {"title": "Test", "metadata": "not-json{"}},
headers=_headers(token),
)
assert response.status_code == 400
assert "metadata" in response.json()["errors"][0]
@pytest.mark.asyncio
async def test_validation_on_update(ds_ct):
await ds_ct.invoke_startup()
token = write_token(ds_ct)
response = await ds_ct.client.post(
"/data/posts/1/-/update",
json={"update": {"author_email": "invalid"}},
headers=_headers(token),
)
assert response.status_code == 400
assert "author_email" in response.json()["errors"][0]
@pytest.mark.asyncio
async def test_validation_allows_null(ds_ct):
await ds_ct.invoke_startup()
token = write_token(ds_ct)
response = await ds_ct.client.post(
"/data/posts/-/insert",
json={"row": {"title": "Test", "author_email": None}},
headers=_headers(token),
)
assert response.status_code == 201
@pytest.mark.asyncio
async def test_validation_allows_empty_string(ds_ct):
await ds_ct.invoke_startup()
token = write_token(ds_ct)
response = await ds_ct.client.post(
"/data/posts/-/insert",
json={"row": {"title": "Test", "author_email": ""}},
headers=_headers(token),
)
assert response.status_code == 201
# --- ColumnType base class ---
@pytest.mark.asyncio
async def test_column_type_base_defaults():
ct = ColumnType(name="test", description="Test type")
assert await ct.render_cell(
"val", "col", "tbl", "db", None, None, None
) is None
assert await ct.validate("val", None, None) is None
assert await ct.transform_value("val", None, None) == "val"
# --- render_cell extra with column types ---
@pytest.mark.asyncio
async def test_render_cell_extra_with_column_types(ds_ct):
await ds_ct.invoke_startup()
response = await ds_ct.client.get("/data/posts.json?_extra=render_cell")
assert response.status_code == 200
data = response.json()
rendered = data["render_cell"][0]
assert "mailto:" in rendered["author_email"]
assert "href" in rendered["website"]

View file

@ -1948,3 +1948,14 @@ def test_metadata_plugin_config_treated_as_config(
assert "plugins" not in actual_metadata
assert actual_metadata == expected_metadata
assert ds.config == expected_config
@pytest.mark.asyncio
async def test_hook_register_column_types():
ds = Datasette()
await ds.invoke_startup()
# Built-in column types should be registered
assert ds.get_column_type_class("url") is not None
assert ds.get_column_type_class("email") is not None
assert ds.get_column_type_class("json") is not None
assert ds.get_column_type_class("nonexistent") is None