Deny unsupported write SQL operations by default

Require view-table permission for reads discovered inside write SQL analysis, including INSERT ... SELECT and CREATE TABLE ... AS SELECT.

Record additional SQLite authorizer callbacks as Operation values so unsupported functions, savepoints, virtual table DDL, and unknown callbacks are denied unless explicitly handled.
This commit is contained in:
Simon Willison 2026-05-27 14:52:52 -07:00
commit 86d0e7335f
6 changed files with 433 additions and 68 deletions

View file

@ -592,6 +592,16 @@ PermissionRequirement = tuple[str, Resource]
def permission_for_operation(operation: Operation) -> PermissionRequirement | None:
if (
operation.operation == "read"
and operation.target_type == "table"
and operation.database is not None
and operation.table is not None
):
return (
"view-table",
TableResource(database=operation.database, table=operation.table),
)
write_actions = {
"insert": "insert-row",
"update": "update-row",
@ -648,6 +658,10 @@ def permission_for_operation(operation: Operation) -> PermissionRequirement | No
return None
def operation_should_be_ignored(operation: Operation) -> bool:
return operation.internal or operation.operation == "select"
def operation_is_write(operation: Operation) -> bool:
return operation.operation in {
"insert",
@ -659,11 +673,13 @@ def operation_is_write(operation: Operation) -> bool:
"begin",
"commit",
"rollback",
"savepoint",
"attach",
"detach",
"pragma",
"analyze",
"reindex",
"unknown",
}
@ -685,34 +701,19 @@ async def ensure_query_write_permissions(
except sqlite3.DatabaseError as ex:
raise Forbidden(f"Could not analyze query: {ex}") from ex
has_semantic_schema_operation = any(
operation.operation in {"create", "alter", "drop"}
and operation.target_type in {"table", "index", "view", "trigger"}
for operation in analysis.operations
)
for operation in analysis.operations:
if operation.internal and has_semantic_schema_operation:
continue
if has_semantic_schema_operation and operation.operation in {
"read",
"insert",
"update",
"delete",
"reindex",
}:
if operation_should_be_ignored(operation):
continue
permission = permission_for_operation(operation)
if permission is None:
if operation_is_write(operation):
raise Forbidden(
"Unsupported SQL operation: {} {}".format(
operation.operation, operation.target_type
)
raise Forbidden(
"Unsupported SQL operation: {} {}".format(
operation.operation, operation.target_type
)
continue
)
action, resource = permission
if operation.database != database:
raise Forbidden("Writable queries may not write to attached databases")
raise Forbidden("Writable queries may not access attached databases")
if not await datasette.allowed(
action=action,
resource=resource,

View file

@ -8,30 +8,39 @@ SQLOperation = Literal[
"insert",
"update",
"delete",
"select",
"function",
"create",
"alter",
"drop",
"begin",
"commit",
"rollback",
"savepoint",
"attach",
"detach",
"pragma",
"analyze",
"reindex",
"unknown",
]
SQLTargetType = Literal[
"table",
"index",
"view",
"trigger",
"virtual-table",
"schema",
"statement",
"transaction",
"database",
"pragma",
"function",
"unknown",
]
SQLTableOperation = Literal["read", "insert", "update", "delete"]
SQLSchemaOperation = Literal["create", "drop"]
SQLSchemaTargetType = Literal["index", "table", "trigger", "view", "virtual-table"]
@dataclass(frozen=True)
@ -73,19 +82,34 @@ _ACTION_TO_OPERATION: dict[int, SQLTableOperation] = {
}
# Values are (operation, target_type) pairs used to construct Operation objects.
_CREATE_ACTIONS = {
_CREATE_ACTIONS: dict[int, tuple[SQLSchemaOperation, SQLSchemaTargetType]] = {
sqlite3.SQLITE_CREATE_INDEX: ("create", "index"),
sqlite3.SQLITE_CREATE_TABLE: ("create", "table"),
sqlite3.SQLITE_CREATE_TRIGGER: ("create", "trigger"),
sqlite3.SQLITE_CREATE_VIEW: ("create", "view"),
}
_DROP_ACTIONS = {
_DROP_ACTIONS: dict[int, tuple[SQLSchemaOperation, SQLSchemaTargetType]] = {
sqlite3.SQLITE_DROP_INDEX: ("drop", "index"),
sqlite3.SQLITE_DROP_TABLE: ("drop", "table"),
sqlite3.SQLITE_DROP_TRIGGER: ("drop", "trigger"),
sqlite3.SQLITE_DROP_VIEW: ("drop", "view"),
}
for action_name, operation, target_type in (
def _add_schema_action(
action_name: str,
operation: SQLSchemaOperation,
target_type: SQLSchemaTargetType,
) -> None:
action_value = getattr(sqlite3, action_name, None)
if action_value is not None:
actions = _CREATE_ACTIONS if operation == "create" else _DROP_ACTIONS
actions[action_value] = (operation, target_type)
_TEMP_SCHEMA_ACTIONS: tuple[
tuple[str, SQLSchemaOperation, SQLSchemaTargetType], ...
] = (
("SQLITE_CREATE_TEMP_INDEX", "create", "index"),
("SQLITE_CREATE_TEMP_TABLE", "create", "table"),
("SQLITE_CREATE_TEMP_TRIGGER", "create", "trigger"),
@ -94,13 +118,76 @@ for action_name, operation, target_type in (
("SQLITE_DROP_TEMP_TABLE", "drop", "table"),
("SQLITE_DROP_TEMP_TRIGGER", "drop", "trigger"),
("SQLITE_DROP_TEMP_VIEW", "drop", "view"),
):
action_value = getattr(sqlite3, action_name, None)
if action_value is not None:
actions = _CREATE_ACTIONS if operation == "create" else _DROP_ACTIONS
actions[action_value] = (operation, target_type)
)
for schema_action in _TEMP_SCHEMA_ACTIONS:
_add_schema_action(*schema_action)
_SQLITE_SCHEMA_TABLES = {"sqlite_master", "sqlite_schema"}
_VTABLE_SCHEMA_ACTIONS: tuple[
tuple[str, SQLSchemaOperation, SQLSchemaTargetType], ...
] = (
("SQLITE_CREATE_VTABLE", "create", "virtual-table"),
("SQLITE_DROP_VTABLE", "drop", "virtual-table"),
)
for schema_action in _VTABLE_SCHEMA_ACTIONS:
_add_schema_action(*schema_action)
_SQLITE_SCHEMA_TABLES = {
"sqlite_master",
"sqlite_schema",
"sqlite_temp_master",
"sqlite_temp_schema",
}
_SQLITE_INTERNAL_SCHEMA_FUNCTIONS = {
"length",
"like",
"printf",
"sqlite_drop_column",
"sqlite_rename_column",
"sqlite_rename_quotefix",
"sqlite_rename_table",
"sqlite_rename_test",
"substr",
}
_AUTHORIZER_ACTION_NAMES = {
getattr(sqlite3, name): name
for name in (
"SQLITE_CREATE_INDEX",
"SQLITE_CREATE_TABLE",
"SQLITE_CREATE_TEMP_INDEX",
"SQLITE_CREATE_TEMP_TABLE",
"SQLITE_CREATE_TEMP_TRIGGER",
"SQLITE_CREATE_TEMP_VIEW",
"SQLITE_CREATE_TRIGGER",
"SQLITE_CREATE_VIEW",
"SQLITE_DELETE",
"SQLITE_DROP_INDEX",
"SQLITE_DROP_TABLE",
"SQLITE_DROP_TEMP_INDEX",
"SQLITE_DROP_TEMP_TABLE",
"SQLITE_DROP_TEMP_TRIGGER",
"SQLITE_DROP_TEMP_VIEW",
"SQLITE_DROP_TRIGGER",
"SQLITE_DROP_VIEW",
"SQLITE_INSERT",
"SQLITE_PRAGMA",
"SQLITE_READ",
"SQLITE_SELECT",
"SQLITE_TRANSACTION",
"SQLITE_UPDATE",
"SQLITE_ATTACH",
"SQLITE_DETACH",
"SQLITE_ALTER_TABLE",
"SQLITE_REINDEX",
"SQLITE_ANALYZE",
"SQLITE_CREATE_VTABLE",
"SQLITE_DROP_VTABLE",
"SQLITE_FUNCTION",
"SQLITE_SAVEPOINT",
"SQLITE_RECURSIVE",
)
if hasattr(sqlite3, name)
}
def analyze_sql_tables(
@ -287,6 +374,52 @@ def analyze_sql_tables(
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_SELECT:
record(
"select",
"statement",
database=None,
table=None,
sqlite_schema=sqlite_schema,
target=None,
source=source,
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_FUNCTION and arg2 is not None:
record(
"function",
"function",
database=None,
table=None,
sqlite_schema=sqlite_schema,
target=arg2,
source=source,
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_SAVEPOINT and arg1 is not None:
record(
"savepoint",
"transaction",
database=None,
table=None,
sqlite_schema=sqlite_schema,
target="{} {}".format(arg1, arg2) if arg2 is not None else arg1,
source=source,
)
return sqlite3.SQLITE_OK
action_name = _AUTHORIZER_ACTION_NAMES.get(action, "SQLITE_{}".format(action))
record(
"unknown",
"unknown",
database=database_for_schema(sqlite_schema),
table=None,
sqlite_schema=sqlite_schema,
target=action_name,
source=source,
)
return sqlite3.SQLITE_OK
conn.set_authorizer(authorizer)
@ -296,10 +429,46 @@ def analyze_sql_tables(
conn.set_authorizer(None)
has_schema_operation = any(
key.target_type in {"table", "index", "view", "trigger"}
key.target_type in {"table", "index", "view", "trigger", "virtual-table"}
and key.operation in {"create", "alter", "drop"}
for key in operations
)
dropped_tables = {
(key.database, key.table)
for key in operations
if key.operation == "drop" and key.target_type == "table"
}
def key_is_drop_table_delete(key: OperationKey) -> bool:
return (
key.operation == "delete"
and key.target_type == "table"
and (key.database, key.table) in dropped_tables
)
has_user_table_access_in_schema_operation = any(
key.operation in {"read", "insert", "update", "delete"}
and key.target_type == "table"
and not key.internal
and not key_is_drop_table_delete(key)
for key in operations
)
def operation_is_internal(key: OperationKey) -> bool:
if key.internal or (has_schema_operation and key.target_type == "schema"):
return True
if has_schema_operation and key.operation == "reindex":
return True
if (
has_schema_operation
and not has_user_table_access_in_schema_operation
and key.operation == "function"
and key.target in _SQLITE_INTERNAL_SCHEMA_FUNCTIONS
):
return True
if key_is_drop_table_delete(key):
return True
return False
return SQLAnalysis(
operations=tuple(
@ -312,8 +481,7 @@ def analyze_sql_tables(
target=key.target,
columns=tuple(sorted(columns)),
source=key.source,
internal=key.internal
or (has_schema_operation and key.target_type == "schema"),
internal=operation_is_internal(key),
)
for key, columns in operations.items()
)

View file

@ -99,9 +99,7 @@ class ExecuteWriteView(BaseView):
"parameter_names": parameter_names,
"parameter_values": parameter_values,
"analysis_error": analysis_error,
"analysis_rows": [
row for row in analysis_rows if row["operation"] != "read"
],
"analysis_rows": analysis_rows,
"execution_message": execution_message,
"execution_links": execution_links,
"execution_ok": execution_ok,

View file

@ -5,6 +5,7 @@ from datasette.resources import DatabaseResource
from datasette.stored_queries import (
StoredQuery,
operation_is_write,
operation_should_be_ignored,
permission_for_operation,
)
from datasette.utils import (
@ -203,29 +204,10 @@ async def _analyze_user_query(datasette, db, sql, *, actor):
return is_write, derived, analysis
def _semantic_schema_operation_is_present(operations: tuple[Operation, ...]) -> bool:
return any(
operation.operation in {"create", "alter", "drop"}
and operation.target_type in {"table", "index", "view", "trigger"}
for operation in operations
)
def _display_operations(analysis: SQLAnalysis) -> list[Operation]:
has_semantic_schema_operation = _semantic_schema_operation_is_present(
analysis.operations
)
operations = []
for operation in analysis.operations:
if operation.internal and has_semantic_schema_operation:
continue
if has_semantic_schema_operation and operation.operation in {
"read",
"insert",
"update",
"delete",
"reindex",
}:
if operation_should_be_ignored(operation):
continue
operations.append(operation)
return operations
@ -252,6 +234,7 @@ async def _analysis_rows_with_permissions(
datasette, analysis: SQLAnalysis, actor
) -> list[dict[str, object]]:
rows = _analysis_rows(analysis)
is_write = _analysis_is_write(analysis)
for row, operation in zip(rows, _display_operations(analysis)):
permission = permission_for_operation(operation)
if permission:
@ -261,7 +244,7 @@ async def _analysis_rows_with_permissions(
resource=resource,
actor=actor,
)
elif operation_is_write(operation):
elif is_write:
row["allowed"] = False
else:
row["allowed"] = None
@ -360,7 +343,7 @@ async def _execute_write_analysis_data(datasette, db, sql, actor):
"ok": analysis_error is None,
"parameters": parameter_names,
"analysis_error": analysis_error,
"analysis_rows": [row for row in analysis_rows if row["operation"] != "read"],
"analysis_rows": analysis_rows,
"execute_disabled": bool(
(not sql)
or analysis_error
@ -374,6 +357,7 @@ async def _query_create_analysis_data(datasette, db, sql, actor):
parameter_names = []
analysis_rows = []
analysis_error = None
analysis: SQLAnalysis | None = None
if has_sql:
try:
parameter_names = _derived_query_parameters(sql)
@ -390,9 +374,7 @@ async def _query_create_analysis_data(datasette, db, sql, actor):
"analysis_error": analysis_error,
"analysis_rows": analysis_rows,
"has_sql": has_sql,
"analysis_is_write": bool(
analysis_rows and any(row["required_permission"] for row in analysis_rows)
),
"analysis_is_write": _analysis_is_write(analysis) if analysis else False,
"save_disabled": bool(
(not has_sql)
or analysis_error

View file

@ -1181,11 +1181,10 @@ async def test_create_query_ui_and_arbitrary_sql_save_link():
assert '<th scope="col">Required permission</th>' in create_response.text
assert '<th scope="col">Source</th>' not in create_response.text
assert "<td><code>read</code></td>" in create_response.text
assert "<td><code>view-table</code></td>" in create_response.text
assert (
create_response.text.count(
'<td><span class="execute-write-analysis-na">n/a</span></td>'
)
== 2
'<td><span class="execute-write-analysis-na">n/a</span></td>'
not in create_response.text
)
assert create_response.text.index(
'value="Save query"'
@ -1255,9 +1254,9 @@ async def test_create_query_analyze_endpoint_uses_sql_only():
"operation": "read",
"database": "data",
"table": "dogs",
"required_permission": "",
"required_permission": "view-table",
"source": None,
"allowed": None,
"allowed": True,
}
]
@ -1375,7 +1374,8 @@ async def test_execute_write_get_prepopulates_without_executing():
assert '<th scope="col">Required permission</th>' in response.text
assert "<td><code>insert</code></td>" in response.text
assert "<td><code>update</code></td>" in response.text
assert "<td><code>read</code></td>" not in response.text
assert "<td><code>read</code></td>" in response.text
assert "<td><code>view-table</code></td>" in response.text
assert 'action="/data/-/execute-write"' in response.text
assert "insert into dogs (name) values (&#39;Cleo&#39;)" in response.text
assert (await db.execute("select count(*) from dogs")).first()[0] == 0
@ -1643,6 +1643,127 @@ async def test_execute_write_post_requires_database_and_table_permissions():
assert (await db.execute("select name from dogs")).first()[0] == "Cleo"
@pytest.mark.asyncio
async def test_execute_write_insert_select_requires_view_table_on_source():
ds = Datasette(
memory=True,
default_deny=True,
config={
"databases": {
"data": {
"permissions": {
"view-database": {"id": "writer"},
"execute-write-sql": {"id": "writer"},
},
"tables": {
"secret": {
"permissions": {"view-table": {"id": "someone-else"}}
},
"public_log": {"permissions": {"insert-row": {"id": "writer"}}},
},
}
}
},
)
db = ds.add_memory_database("execute_write_insert_select_source", name="data")
await db.execute_write("create table secret (value text)")
await db.execute_write("create table public_log (value text)")
await db.execute_write("insert into secret values ('sensitive')")
await ds.invoke_startup()
denied_response = await ds.client.post(
"/data/-/execute-write",
actor={"id": "writer"},
json={"sql": "insert into public_log(value) select value from secret"},
)
assert denied_response.status_code == 403
assert denied_response.json()["errors"] == [
"Permission denied: need view-table on data/secret"
]
assert (await db.execute("select value from public_log")).dicts() == []
@pytest.mark.asyncio
async def test_execute_write_create_table_as_select_requires_view_table_on_source():
ds = Datasette(
memory=True,
default_deny=True,
config={
"databases": {
"data": {
"permissions": {
"view-database": {"id": "creator"},
"execute-write-sql": {"id": "creator"},
"create-table": {"id": "creator"},
},
"tables": {
"secret": {
"permissions": {"view-table": {"id": "someone-else"}}
}
},
}
}
},
)
db = ds.add_memory_database("execute_write_create_as_select_source", name="data")
await db.execute_write("create table secret (value text)")
await db.execute_write("insert into secret values ('sensitive')")
await ds.invoke_startup()
denied_response = await ds.client.post(
"/data/-/execute-write",
actor={"id": "creator"},
json={"sql": "create table copied_secret as select value from secret"},
)
assert denied_response.status_code == 403
assert denied_response.json()["errors"] == [
"Permission denied: need view-table on data/secret"
]
assert not await db.table_exists("copied_secret")
@pytest.mark.asyncio
async def test_execute_write_rejects_function_operations():
ds = Datasette(
memory=True,
default_deny=True,
config={
"databases": {
"data": {
"permissions": {
"view-database": {"id": "writer"},
"execute-write-sql": {"id": "writer"},
},
"tables": {
"dogs": {
"permissions": {
"insert-row": {"id": "writer"},
}
}
},
}
}
},
)
db = ds.add_memory_database("execute_write_function_operation", name="data")
await db.execute_write("create table dogs (id integer primary key, name text)")
await ds.invoke_startup()
denied_response = await ds.client.post(
"/data/-/execute-write",
actor={"id": "writer"},
json={"sql": "insert into dogs (name) values (upper('cleo'))"},
)
assert denied_response.status_code == 403
assert denied_response.json()["errors"] == [
"Unsupported SQL operation: function function"
]
assert (await db.execute("select name from dogs")).dicts() == []
@pytest.mark.asyncio
async def test_execute_write_create_table_uses_create_table_permission():
ds = Datasette(
@ -1733,6 +1854,7 @@ async def test_execute_write_alter_and_drop_table_use_schema_permissions():
"permissions": {
"alter-table": {"id": "alterer"},
"drop-table": {"id": "dropper"},
"view-table": {"id": "alterer"},
}
}
},

View file

@ -127,6 +127,100 @@ def test_analyze_transaction_operation(conn):
]
def test_analyze_savepoint_operation(conn):
analysis = analyze_sql_tables(conn, "savepoint s", database_name="data")
assert [operation_dict(operation) for operation in analysis.operations] == [
{
"operation": "savepoint",
"target_type": "transaction",
"database": None,
"sqlite_schema": None,
"table": None,
"target": "BEGIN s",
"columns": (),
"source": None,
"internal": False,
}
]
def test_analyze_function_operation(conn):
analysis = analyze_sql_tables(
conn,
"insert into dogs (name) values (upper(:name))",
{"name": "Cleo"},
database_name="data",
)
assert {
(
operation.operation,
operation.target_type,
operation.target,
operation.database,
operation.table,
)
for operation in analysis.operations
} == {
("insert", "table", "dogs", "data", "dogs"),
("function", "function", "upper", None, None),
("read", "table", "dogs", "data", "dogs"),
("update", "table", "cats", "data", "cats"),
("read", "table", "cats", "data", "cats"),
("insert", "table", "log", "data", "log"),
}
def test_analyze_create_virtual_table_operation():
conn = sqlite3.connect(":memory:")
try:
analysis = analyze_sql_tables(
conn,
"create virtual table docs using fts5(body)",
database_name="data",
)
finally:
conn.close()
assert {
"operation": "create",
"target_type": "virtual-table",
"database": "data",
"sqlite_schema": "main",
"table": "docs",
"target": "docs",
"columns": (),
"source": None,
"internal": False,
} in [operation_dict(operation) for operation in analysis.operations]
def test_analyze_create_table_as_select_function_is_not_internal():
conn = sqlite3.connect(":memory:")
try:
conn.execute("create table secret(value text)")
analysis = analyze_sql_tables(
conn,
"create table copied as select substr(value, 1, 1) from secret",
database_name="data",
)
finally:
conn.close()
assert {
"operation": "function",
"target_type": "function",
"database": None,
"sqlite_schema": None,
"table": None,
"target": "substr",
"columns": (),
"source": None,
"internal": False,
} in [operation_dict(operation) for operation in analysis.operations]
def test_analyze_insert_tables(conn):
analysis = analyze_sql_tables(
conn,