Only show valid SQL write templates

Closes #2753

Demo: https://github.com/simonw/datasette/issues/2753#issuecomment-4570071413
This commit is contained in:
Simon Willison 2026-05-28 20:01:48 -07:00
commit 9e377e8b90
3 changed files with 328 additions and 105 deletions

View file

@ -89,16 +89,18 @@ form.sql.core input[data-execute-write-submit]:disabled {
<p class="execute-write-template-controls">
<label for="execute-write-template-table">Table</label>
<select id="execute-write-template-table">
{% for table_name, columns in write_template_tables|dictsort %}
<option value="{{ table_name }}">{{ table_name }}</option>
{% for table_name, table in write_template_tables|dictsort %}
<option value="{{ table_name }}"{% for operation, template_sql in table.templates|dictsort %} data-template-{{ operation }}-sql="{{ template_sql }}"{% endfor %}>{{ table_name }}</option>
{% endfor %}
</select>
<button type="button" data-sql-template="insert">Insert row</button>
<button type="button" data-sql-template="update">Update rows</button>
<button type="button" data-sql-template="delete">Delete rows</button>
{% for operation in write_template_operations %}
<button type="button" data-sql-template="{{ operation.name }}">{{ operation.label }}</button>
{% endfor %}
</p>
</details>
</div>
{% else %}
<p class="message-warning execute-write-template-unavailable">You don't currently have permission to insert, edit or delete from any tables.</p>
{% endif %}
<p class="sql-editor"><textarea id="sql-editor" name="sql"{% if sql %} style="height: {{ sql.split("\n")|length + 2 }}em"{% endif %}>{{ sql }}</textarea></p>
@ -242,119 +244,43 @@ window.addEventListener("DOMContentLoaded", () => {
{% if write_template_tables %}
<script>
window.addEventListener("DOMContentLoaded", () => {
const tableColumns = {{ write_template_tables|tojson(2) }};
const tableSelect = document.querySelector("#execute-write-template-table");
const templateButtons = document.querySelectorAll("[data-sql-template]");
function quoteIdentifier(identifier) {
return `"${identifier.replace(/"/g, '""')}"`;
function dataKey(operation) {
return `template${operation.charAt(0).toUpperCase()}${operation.slice(1)}Sql`;
}
function parameterNames(columns) {
const seen = new Set();
const names = {};
columns.forEach((column) => {
let base = column
.toLowerCase()
.replace(/[^a-z0-9_]+/g, "_")
.replace(/^_+|_+$/g, "");
if (!base) {
base = "value";
}
if (/^[0-9]/.test(base)) {
base = `p_${base}`;
}
let name = base;
let index = 2;
while (seen.has(name)) {
name = `${base}_${index}`;
index += 1;
}
seen.add(name);
names[column] = name;
function selectedOption() {
return tableSelect ? tableSelect.options[tableSelect.selectedIndex] : null;
}
function templateSql(operation) {
const option = selectedOption();
return option ? option.dataset[dataKey(operation)] || "" : "";
}
function updateTemplateButtons() {
templateButtons.forEach((button) => {
button.hidden = !templateSql(button.dataset.sqlTemplate);
});
return names;
}
function preferredWhereColumn(table, columns) {
const lowerTableId = `${table.toLowerCase()}_id`;
return (
columns.find((column) => column.toLowerCase() === "id") ||
columns.find((column) => column.toLowerCase() === lowerTableId) ||
columns[0]
);
}
function insertSql(table, columns) {
const names = parameterNames(columns);
return [
`insert into ${quoteIdentifier(table)} (`,
columns.map((column) => ` ${quoteIdentifier(column)}`).join(",\n"),
")",
"values (",
columns.map((column) => ` :${names[column]}`).join(",\n"),
")",
].join("\n");
}
function updateSql(table, columns) {
const names = parameterNames(columns);
const whereColumn = preferredWhereColumn(table, columns);
const setColumns = columns.filter((column) => column !== whereColumn);
if (!setColumns.length) {
return [
`update ${quoteIdentifier(table)}`,
`set ${quoteIdentifier(whereColumn)} = :new_${names[whereColumn]}`,
`where ${quoteIdentifier(whereColumn)} = :${names[whereColumn]}`,
].join("\n");
}
return [
`update ${quoteIdentifier(table)}`,
"set " +
setColumns
.map((column, index) => {
const indent = index ? " " : "";
return `${indent}${quoteIdentifier(column)} = :${names[column]}`;
})
.join(",\n"),
`where ${quoteIdentifier(whereColumn)} = :${names[whereColumn]}`,
].join("\n");
}
function deleteSql(table, columns) {
const names = parameterNames(columns);
const whereColumn = preferredWhereColumn(table, columns);
return [
`delete from ${quoteIdentifier(table)}`,
`where ${quoteIdentifier(whereColumn)} = :${names[whereColumn]}`,
].join("\n");
}
function templateSql(operation, table, columns) {
if (operation === "insert") {
return insertSql(table, columns);
}
if (operation === "update") {
return updateSql(table, columns);
}
return deleteSql(table, columns);
}
templateButtons.forEach((button) => {
button.addEventListener("click", () => {
const table = tableSelect.value;
const columns = tableColumns[table] || [];
if (!columns.length) {
const sql = templateSql(button.dataset.sqlTemplate);
if (!sql) {
return;
}
const url = new URL(window.location.href);
url.searchParams.set(
"sql",
templateSql(button.dataset.sqlTemplate, table, columns)
);
url.searchParams.set("sql", sql);
window.location.href = url.toString();
});
});
if (tableSelect) {
tableSelect.addEventListener("change", updateTemplateButtons);
}
updateTemplateButtons();
});
</script>
{% endif %}

View file

@ -1,3 +1,4 @@
import re
from urllib.parse import urlencode
from datasette.resources import DatabaseResource
@ -22,6 +23,187 @@ from .query_helpers import (
_wants_json,
)
WRITE_TEMPLATE_LABELS = {
"insert": "Insert row",
"update": "Update rows",
"delete": "Delete rows",
}
WRITE_TEMPLATE_OPERATIONS = tuple(WRITE_TEMPLATE_LABELS)
def _parameter_names(columns):
seen = set()
names = {}
for column in columns:
base = re.sub(r"[^a-z0-9_]+", "_", column.lower())
base = base.strip("_") or "value"
if base[0].isdigit():
base = "p_{}".format(base)
name = base
index = 2
while name in seen:
name = "{}_{}".format(base, index)
index += 1
seen.add(name)
names[column] = name
return names
def _quote_identifier(identifier):
return '"{}"'.format(identifier.replace('"', '""'))
def _preferred_where_column(table, columns):
lower_table_id = "{}_id".format(table.lower())
return (
next((column for column in columns if column.lower() == "id"), None)
or next(
(column for column in columns if column.lower() == lower_table_id), None
)
or columns[0]
)
def _auto_incrementing_primary_key(columns):
primary_keys = [column for column in columns if column.is_pk]
if len(primary_keys) != 1:
return None
primary_key = primary_keys[0]
if primary_key.type and primary_key.type.lower() == "integer":
return primary_key.name
return None
def _insert_template_sql(table, columns):
column_names = [column.name for column in columns]
auto_pk = _auto_incrementing_primary_key(columns)
insert_columns = [column for column in column_names if column != auto_pk]
if not insert_columns:
return "insert into {}\ndefault values".format(_quote_identifier(table))
names = _parameter_names(insert_columns)
return "\n".join(
(
"insert into {} (".format(_quote_identifier(table)),
",\n".join(
" {}".format(_quote_identifier(column)) for column in insert_columns
),
")",
"values (",
",\n".join(" :{}".format(names[column]) for column in insert_columns),
")",
)
)
def _update_template_sql(table, columns):
column_names = [column.name for column in columns]
names = _parameter_names(column_names)
where_column = _preferred_where_column(table, column_names)
set_columns = [column for column in column_names if column != where_column]
if not set_columns:
return "\n".join(
(
"update {}".format(_quote_identifier(table)),
"set {} = :new_{}".format(
_quote_identifier(where_column), names[where_column]
),
"where {} = :{}".format(
_quote_identifier(where_column), names[where_column]
),
)
)
return "\n".join(
(
"update {}".format(_quote_identifier(table)),
"set "
+ ",\n".join(
"{}{} = :{}".format(
" " if index else "",
_quote_identifier(column),
names[column],
)
for index, column in enumerate(set_columns)
),
"where {} = :{}".format(
_quote_identifier(where_column), names[where_column]
),
)
)
def _delete_template_sql(table, columns):
column_names = [column.name for column in columns]
names = _parameter_names(column_names)
where_column = _preferred_where_column(table, column_names)
return "\n".join(
(
"delete from {}".format(_quote_identifier(table)),
"where {} = :{}".format(
_quote_identifier(where_column), names[where_column]
),
)
)
def _template_sqls_for_table(table, columns):
return {
"insert": _insert_template_sql(table, columns),
"update": _update_template_sql(table, columns),
"delete": _delete_template_sql(table, columns),
}
async def _template_sql_allowed(datasette, db, sql, actor):
params = {parameter: "" for parameter in _derived_query_parameters(sql)}
try:
analysis = await db.analyze_sql(sql, params)
except sqlite3.DatabaseError:
return False
if not _analysis_is_write(analysis):
return False
analysis_rows = await _analysis_rows_with_permissions(datasette, analysis, actor)
return _execute_write_disabled_reason(sql, None, analysis_rows) is None
async def _write_template_tables(
datasette, db, table_columns, hidden_table_names, actor
):
write_template_tables = {}
for table in table_columns:
if table in hidden_table_names or not table_columns[table]:
continue
column_details = [
column
for column in await db.table_column_details(table)
if not column.hidden
]
if not column_details:
continue
templates = {}
for operation, sql in _template_sqls_for_table(table, column_details).items():
if await _template_sql_allowed(datasette, db, sql, actor):
templates[operation] = sql
if templates:
write_template_tables[table] = {
"templates": templates,
}
return write_template_tables
def _write_template_operations(write_template_tables):
operations = []
for operation in WRITE_TEMPLATE_OPERATIONS:
if any(
operation in table["templates"] for table in write_template_tables.values()
):
operations.append(
{
"name": operation,
"label": WRITE_TEMPLATE_LABELS[operation],
}
)
return operations
class ExecuteWriteView(BaseView):
name = "execute-write"
@ -47,11 +229,10 @@ class ExecuteWriteView(BaseView):
analysis_rows = []
table_columns = await _table_columns(self.ds, db.name)
hidden_table_names = set(await db.hidden_table_names())
write_template_tables = {
table: columns
for table, columns in table_columns.items()
if columns and table not in hidden_table_names
}
write_template_tables = await _write_template_tables(
self.ds, db, table_columns, hidden_table_names, request.actor
)
write_template_operations = _write_template_operations(write_template_tables)
if sql and analysis_error is None:
try:
parameter_names = _derived_query_parameters(sql)
@ -107,6 +288,7 @@ class ExecuteWriteView(BaseView):
"execute_disabled_reason": execute_disabled_reason,
"table_columns": table_columns,
"write_template_tables": write_template_tables,
"write_template_operations": write_template_operations,
"save_query_url": save_query_url,
"save_query_base_url": save_query_base_url,
},

View file

@ -1,4 +1,6 @@
import json
import re
from html import unescape
import pytest
@ -8,6 +10,19 @@ from datasette.stored_queries import StoredQuery, StoredQueryPage
from datasette.utils.asgi import Forbidden
def _template_option_attributes(html, table):
match = re.search(r'<option value="{}"([^>]*)>'.format(table), html)
assert match, "Could not find template option for {}".format(table)
return match.group(1)
def _template_sql(html, table, operation):
attrs = _template_option_attributes(html, table)
match = re.search(r'data-template-{}-sql="([^"]*)"'.format(operation), attrs)
assert match, "Could not find {} template for {}".format(operation, table)
return unescape(match.group(1))
async def add_numbered_queries(ds, database, count):
for i in range(1, count + 1):
await ds.add_query(
@ -1360,7 +1375,8 @@ async def test_execute_write_get_prepopulates_without_executing():
)
assert "<h2>Query operations</h2>" in response.text
assert "<summary>Start with a template</summary>" in response.text
assert '<option value="dogs">dogs</option>' in response.text
assert '<option value="dogs"' in response.text
assert "data-template-insert-sql=" in response.text
assert 'data-sql-template="insert"' in response.text
assert 'data-sql-template="update"' in response.text
assert 'data-sql-template="delete"' in response.text
@ -1469,6 +1485,105 @@ async def test_execute_write_disabled_submit_explains_denied_operations():
)
@pytest.mark.asyncio
async def test_execute_write_templates_are_filtered_by_permission_and_server_generated():
ds = Datasette(
memory=True,
default_deny=True,
config={
"databases": {
"data": {
"permissions": {
"view-database": {"id": ["writer", "deleter", "viewer"]},
"execute-write-sql": {"id": ["writer", "deleter", "viewer"]},
},
"tables": {
"dogs": {
"permissions": {
"view-table": {"id": ["writer", "deleter"]},
"insert-row": {"id": "writer"},
"update-row": {"id": "writer"},
"delete-row": {"id": ["writer", "deleter"]},
}
},
"manual": {
"permissions": {
"view-table": {"id": "writer"},
"insert-row": {"id": "writer"},
"update-row": {"id": "writer"},
"delete-row": {"id": "writer"},
}
},
},
}
}
},
)
db = ds.add_memory_database("execute_write_templates", name="data")
await db.execute_write(
"create table dogs (id integer primary key, name text, age integer)"
)
await db.execute_write("create table manual (id text primary key, name text)")
await db.execute_write("create table cats (id integer primary key, name text)")
await ds.invoke_startup()
writer_response = await ds.client.get(
"/data/-/execute-write", actor={"id": "writer"}
)
deleter_response = await ds.client.get(
"/data/-/execute-write", actor={"id": "deleter"}
)
viewer_response = await ds.client.get(
"/data/-/execute-write", actor={"id": "viewer"}
)
assert writer_response.status_code == 200
assert "<summary>Start with a template</summary>" in writer_response.text
assert "You don't currently have permission" not in writer_response.text
assert '<option value="dogs"' in writer_response.text
assert '<option value="manual"' in writer_response.text
assert '<option value="cats"' not in writer_response.text
assert "function insertSql(" not in writer_response.text
assert "function updateSql(" not in writer_response.text
assert "function deleteSql(" not in writer_response.text
dogs_insert_sql = _template_sql(writer_response.text, "dogs", "insert")
assert '"id"' not in dogs_insert_sql
assert '"name"' in dogs_insert_sql
assert '"age"' in dogs_insert_sql
assert ":name" in dogs_insert_sql
assert ":age" in dogs_insert_sql
dogs_update_sql = _template_sql(writer_response.text, "dogs", "update")
assert 'where "id" = :id' in dogs_update_sql
assert '"id" = :new_id' not in dogs_update_sql
manual_insert_sql = _template_sql(writer_response.text, "manual", "insert")
assert '"id"' in manual_insert_sql
assert ":id" in manual_insert_sql
assert deleter_response.status_code == 200
assert "<summary>Start with a template</summary>" in deleter_response.text
assert '<option value="dogs"' in deleter_response.text
dogs_attrs = _template_option_attributes(deleter_response.text, "dogs")
assert "data-template-delete-sql" in dogs_attrs
assert "data-template-insert-sql" not in dogs_attrs
assert "data-template-update-sql" not in dogs_attrs
assert 'data-sql-template="delete"' in deleter_response.text
assert 'data-sql-template="insert"' not in deleter_response.text
assert 'data-sql-template="update"' not in deleter_response.text
assert viewer_response.status_code == 200
assert "<summary>Start with a template</summary>" not in viewer_response.text
assert (
"You don't currently have permission to insert, edit or delete from any tables."
in viewer_response.text
)
assert "data-template-insert-sql" not in viewer_response.text
assert "data-template-update-sql" not in viewer_response.text
assert "data-template-delete-sql" not in viewer_response.text
@pytest.mark.asyncio
async def test_execute_write_analyze_endpoint_uses_sql_only():
ds = Datasette(memory=True, default_deny=True)