Refactor ColumnType: register classes, return instances with config

- register_column_types() now returns classes instead of instances
- ColumnType.__init__ takes optional config=, baking it into the instance
- get_column_type() returns a ColumnType instance (or None) instead of a
  (name, config) tuple
- get_column_types() returns {col: ColumnType instance} instead of tuples
- Remove get_column_type_class() - no longer needed
- render_cell/validate/transform_value methods no longer take config arg;
  use self.config instead
- render_cell hook takes column_type (ColumnType or None) instead of
  column_type + column_type_config

https://claude.ai/code/session_01SvPEPqHgURTWESRp28pTC3
This commit is contained in:
Claude 2026-03-17 05:18:14 +00:00
commit dd9b83301c
No known key found for this signature in database
11 changed files with 227 additions and 243 deletions

View file

@ -693,14 +693,14 @@ class Datasette:
action_abbrs[action.abbr] = action
self.actions[action.name] = action
# Register column types
# Register column types (classes, not instances)
self._column_types = {}
for hook in pm.hook.register_column_types(datasette=self):
if hook:
for ct in hook:
if ct.name in self._column_types:
raise StartupError(f"Duplicate column type name: {ct.name}")
self._column_types[ct.name] = ct
for ct_cls in hook:
if ct_cls.name in self._column_types:
raise StartupError(f"Duplicate column type name: {ct_cls.name}")
self._column_types[ct_cls.name] = ct_cls
for hook in pm.hook.prepare_jinja2_environment(
env=self._jinja_env, datasette=self
@ -984,10 +984,10 @@ class Datasette:
db_name, table_name, col_name, col_type, config
)
async def get_column_type(self, database: str, resource: str, column: str) -> tuple:
async def get_column_type(self, database: str, resource: str, column: str):
"""
Return (column_type_name, config_dict) for a specific column,
or (None, None) if no column type is assigned.
Return a ColumnType instance (with config baked in) for a specific
column, or None if no column type is assigned.
"""
row = await self.get_internal_database().execute(
"SELECT column_type, config FROM column_types "
@ -996,13 +996,16 @@ class Datasette:
)
rows = row.rows
if not rows:
return None, None
ct, config = rows[0]
return (ct, json.loads(config) if config else None)
return None
ct_name, config = rows[0]
ct_cls = self._column_types.get(ct_name)
if ct_cls is None:
return None
return ct_cls(config=json.loads(config) if config else None)
async def get_column_types(self, database: str, resource: str) -> dict:
"""
Return {column_name: (column_type_name, config_dict_or_None)}
Return {column_name: ColumnType instance (with config)}
for all columns with assigned types on the given resource.
"""
rows = await self.get_internal_database().execute(
@ -1010,10 +1013,13 @@ class Datasette:
"WHERE database_name = ? AND resource_name = ?",
[database, resource],
)
return {
row[0]: (row[1], json.loads(row[2]) if row[2] else None)
for row in rows.rows
}
result = {}
for row in rows.rows:
col_name, ct_name, config = row
ct_cls = self._column_types.get(ct_name)
if ct_cls is not None:
result[col_name] = ct_cls(config=json.loads(config) if config else None)
return result
async def set_column_type(
self,
@ -1047,13 +1053,6 @@ class Datasette:
[database, resource, column],
)
def get_column_type_class(self, column_type_name: str):
"""
Return the registered ColumnType instance for a given name,
or None if no plugin has registered that name.
"""
return self._column_types.get(column_type_name)
def get_internal_database(self):
return self._internal_database

View file

@ -8,31 +8,35 @@ class ColumnType:
Examples: "markdown", "file", "email", "url", "point", "image".
- ``description``: Human-readable label for admin UI dropdowns.
Examples: "Markdown text", "File reference", "Email address".
Instantiate with an optional ``config`` dict to bind per-column
configuration::
ct = MyColumnType(config={"key": "value"})
ct.config # {"key": "value"}
"""
name: str
description: str
async def render_cell(
self, value, column, table, database, datasette, request, config
):
def __init__(self, config=None):
self.config = config
async def render_cell(self, value, column, table, database, datasette, request):
"""
Return an HTML string to render this cell value, or None to
fall through to the default render_cell plugin hook chain.
``config`` is the parsed JSON config dict for this specific
column assignment, or None.
"""
return None
async def validate(self, value, config, datasette):
async def validate(self, value, datasette):
"""
Validate a value before it is written. Return None if valid,
or a string error message if invalid.
"""
return None
async def transform_value(self, value, config, datasette):
async def transform_value(self, value, datasette):
"""
Transform a value before it appears in JSON API output.
Return the transformed value. Default: return unchanged.

View file

@ -11,15 +11,13 @@ class UrlColumnType(ColumnType):
name = "url"
description = "URL"
async def render_cell(
self, value, column, table, database, datasette, request, config
):
async def render_cell(self, value, column, table, database, datasette, request):
if not value or not isinstance(value, str):
return None
escaped = markupsafe.escape(value.strip())
return markupsafe.Markup(f'<a href="{escaped}">{escaped}</a>')
async def validate(self, value, config, datasette):
async def validate(self, value, datasette):
if value is None or value == "":
return None
if not isinstance(value, str):
@ -33,15 +31,13 @@ class EmailColumnType(ColumnType):
name = "email"
description = "Email address"
async def render_cell(
self, value, column, table, database, datasette, request, config
):
async def render_cell(self, value, column, table, database, datasette, request):
if not value or not isinstance(value, str):
return None
escaped = markupsafe.escape(value.strip())
return markupsafe.Markup(f'<a href="mailto:{escaped}">{escaped}</a>')
async def validate(self, value, config, datasette):
async def validate(self, value, datasette):
if value is None or value == "":
return None
if not isinstance(value, str):
@ -55,9 +51,7 @@ class JsonColumnType(ColumnType):
name = "json"
description = "JSON data"
async def render_cell(
self, value, column, table, database, datasette, request, config
):
async def render_cell(self, value, column, table, database, datasette, request):
if value is None:
return None
try:
@ -68,7 +62,7 @@ class JsonColumnType(ColumnType):
except (json.JSONDecodeError, TypeError):
return None
async def validate(self, value, config, datasette):
async def validate(self, value, datasette):
if value is None or value == "":
return None
if isinstance(value, str):
@ -81,8 +75,4 @@ class JsonColumnType(ColumnType):
@hookimpl
def register_column_types(datasette):
return [
UrlColumnType(),
EmailColumnType(),
JsonColumnType(),
]
return [UrlColumnType, EmailColumnType, JsonColumnType]

View file

@ -65,7 +65,6 @@ def render_cell(
datasette,
request,
column_type,
column_type_config,
):
"""Customize rendering of HTML table cell values"""

View file

@ -1206,7 +1206,6 @@ async def display_rows(datasette, database, request, rows, columns):
datasette=datasette,
request=request,
column_type=None,
column_type_config=None,
):
candidate = await await_me_maybe(candidate)
if candidate is not None:

View file

@ -184,25 +184,20 @@ class RowView(DataView):
for row in rows:
rendered_row = {}
for value, column in zip(row, columns):
ct_info = ct_map.get(column)
ct_name = ct_info[0] if ct_info else None
ct_config = ct_info[1] if ct_info else None
ct = ct_map.get(column)
plugin_display_value = None
# Try column type render_cell first
if ct_name:
ct_class = self.ds.get_column_type_class(ct_name)
if ct_class:
candidate = await ct_class.render_cell(
value=value,
column=column,
table=table,
database=database,
datasette=self.ds,
request=request,
config=ct_config,
)
if candidate is not None:
plugin_display_value = candidate
if ct:
candidate = await ct.render_cell(
value=value,
column=column,
table=table,
database=database,
datasette=self.ds,
request=request,
)
if candidate is not None:
plugin_display_value = candidate
if plugin_display_value is None:
for candidate in pm.hook.render_cell(
row=row,
@ -213,8 +208,7 @@ class RowView(DataView):
database=database,
datasette=self.ds,
request=request,
column_type=ct_name,
column_type_config=ct_config,
column_type=ct,
):
candidate = await await_me_maybe(candidate)
if candidate is not None:

View file

@ -141,13 +141,10 @@ async def _validate_column_types(datasette, database_name, table_name, rows):
return []
errors = []
for row in rows:
for col_name, (ct_name, ct_config) in ct_map.items():
for col_name, ct in ct_map.items():
if col_name not in row:
continue
ct_class = datasette.get_column_type_class(ct_name)
if ct_class is None:
continue
error = await ct_class.validate(row[col_name], ct_config, datasette)
error = await ct.validate(row[col_name], datasette)
if error:
errors.append(f"{col_name}: {error}")
return errors
@ -211,10 +208,10 @@ async def display_columns_and_rows(
"column_type": None,
"column_type_config": None,
}
ct_info = column_types_map.get(r[0])
if ct_info:
col_dict["column_type"] = ct_info[0]
col_dict["column_type_config"] = ct_info[1]
ct = column_types_map.get(r[0])
if ct:
col_dict["column_type"] = ct.name
col_dict["column_type_config"] = ct.config
columns.append(col_dict)
column_to_foreign_key_table = {
@ -257,22 +254,18 @@ async def display_columns_and_rows(
# First try column type render_cell, then plugins
# pylint: disable=no-member
plugin_display_value = None
ct_name = column_dict.get("column_type")
ct_config = column_dict.get("column_type_config")
if ct_name:
ct_class = datasette.get_column_type_class(ct_name)
if ct_class:
candidate = await ct_class.render_cell(
value=value,
column=column,
table=table_name,
database=database_name,
datasette=datasette,
request=request,
config=ct_config,
)
if candidate is not None:
plugin_display_value = candidate
ct = column_types_map.get(column)
if ct:
candidate = await ct.render_cell(
value=value,
column=column,
table=table_name,
database=database_name,
datasette=datasette,
request=request,
)
if candidate is not None:
plugin_display_value = candidate
if plugin_display_value is None:
for candidate in pm.hook.render_cell(
row=row,
@ -283,8 +276,7 @@ async def display_columns_and_rows(
database=database_name,
datasette=datasette,
request=request,
column_type=ct_name,
column_type_config=ct_config,
column_type=ct,
):
candidate = await await_me_maybe(candidate)
if candidate is not None:
@ -1559,25 +1551,20 @@ async def table_view_data(
for row in rows:
rendered_row = {}
for value, column in zip(row, col_names):
ct_info = ct_map.get(column)
ct_name = ct_info[0] if ct_info else None
ct_config = ct_info[1] if ct_info else None
ct = ct_map.get(column)
plugin_display_value = None
# Try column type render_cell first
if ct_name:
ct_class = datasette.get_column_type_class(ct_name)
if ct_class:
candidate = await ct_class.render_cell(
value=value,
column=column,
table=table_name,
database=database_name,
datasette=datasette,
request=request,
config=ct_config,
)
if candidate is not None:
plugin_display_value = candidate
if ct:
candidate = await ct.render_cell(
value=value,
column=column,
table=table_name,
database=database_name,
datasette=datasette,
request=request,
)
if candidate is not None:
plugin_display_value = candidate
if plugin_display_value is None:
for candidate in pm.hook.render_cell(
row=row,
@ -1588,8 +1575,7 @@ async def table_view_data(
database=database_name,
datasette=datasette,
request=request,
column_type=ct_name,
column_type_config=ct_config,
column_type=ct,
):
candidate = await await_me_maybe(candidate)
if candidate is not None:
@ -1612,10 +1598,10 @@ async def table_view_data(
ct_map = await datasette.get_column_types(database_name, table_name)
return {
col_name: {
"type": ct_name,
"config": ct_config,
"type": ct.name,
"config": ct.config,
}
for col_name, (ct_name, ct_config) in ct_map.items()
for col_name, ct in ct_map.items()
}
async def extra_metadata():
@ -1866,13 +1852,11 @@ async def table_view_data(
transformed_rows = []
for r in raw_sqlite_rows:
row_dict = dict(r)
for col_name, (ct_name, ct_config) in ct_map.items():
for col_name, ct in ct_map.items():
if col_name in row_dict:
ct_class = datasette.get_column_type_class(ct_name)
if ct_class:
row_dict[col_name] = await ct_class.transform_value(
row_dict[col_name], ct_config, datasette
)
row_dict[col_name] = await ct.transform_value(
row_dict[col_name], datasette
)
transformed_rows.append(row_dict)
data["rows"] = transformed_rows