Deny VACUUM in user-authored SQL

Reject VACUUM explicitly during write-query permission analysis so arbitrary write SQL and untrusted stored write queries cannot run it, even when the actor has execute-write-sql.

Refs https://github.com/simonw/datasette/pull/2749#issuecomment-4559073803 (P3)
This commit is contained in:
Simon Willison 2026-05-27 16:51:12 -07:00
commit 11bddc8919
5 changed files with 199 additions and 8 deletions

View file

@ -15,6 +15,13 @@ if TYPE_CHECKING:
UNCHANGED = object()
class QueryWriteRejected(Exception):
def __init__(self, message: str):
self.message = message
super().__init__(message)
QUERY_OPTION_FIELDS = (
"hide_sql",
"fragment",
@ -703,6 +710,12 @@ def operation_should_be_ignored(operation: Operation) -> bool:
return operation.internal or operation.operation == "select"
def operation_forbidden_message(operation: Operation) -> str | None:
if operation.operation == "vacuum":
return "VACUUM is not allowed in user-supplied SQL"
return None
def operation_is_write(operation: Operation) -> bool:
return operation.operation in {
"insert",
@ -746,6 +759,9 @@ async def ensure_query_write_permissions(
for operation in analysis.operations:
if operation_should_be_ignored(operation):
continue
forbidden_message = operation_forbidden_message(operation)
if forbidden_message is not None:
raise QueryWriteRejected(forbidden_message)
permissions = permission_requirements_for_operation(operation)
if not permissions:
raise Forbidden(

View file

@ -13,7 +13,7 @@ import textwrap
from datasette.events import AlterTableEvent, CreateTableEvent, InsertRowsEvent
from datasette.database import QueryInterrupted
from datasette.resources import DatabaseResource, QueryResource
from datasette.stored_queries import stored_query_to_dict
from datasette.stored_queries import QueryWriteRejected, stored_query_to_dict
from datasette.utils import (
add_cors_headers,
await_me_maybe,
@ -453,9 +453,24 @@ class QueryView(View):
):
raise Forbidden("You do not have permission to view this query")
await _ensure_stored_query_execution_permissions(
datasette, db, stored_query, request.actor
)
try:
await _ensure_stored_query_execution_permissions(
datasette, db, stored_query, request.actor
)
except QueryWriteRejected as ex:
if request.headers.get("accept") == "application/json" or request.args.get(
"_json"
):
return Response.json(
{
"ok": False,
"message": ex.message,
"redirect": None,
},
status=403,
)
datasette.add_message(request, ex.message, datasette.ERROR)
return Response.redirect(stored_query.on_error_redirect or request.path)
# If database is immutable, return an error
if not db.is_mutable:

View file

@ -163,13 +163,15 @@ class ExecuteWriteView(BaseView):
except QueryValidationError as ex:
if _wants_json(request, is_json, data):
return _block_framing(_error([ex.message], ex.status))
if ex.flash:
self.ds.add_message(request, ex.message, self.ds.ERROR)
return await self._render_form(
request,
db,
sql=sql or "",
parameter_values=provided_params,
analysis_error=ex.message,
execution_message=ex.message,
analysis_error=None if ex.flash else ex.message,
execution_message=None if ex.flash else ex.message,
execution_ok=False,
status=ex.status,
)

View file

@ -3,6 +3,7 @@ import re
from datasette.resources import DatabaseResource
from datasette.stored_queries import (
QueryWriteRejected,
StoredQuery,
operation_is_write,
operation_should_be_ignored,
@ -47,9 +48,11 @@ _query_write_fields = {
class QueryValidationError(Exception):
def __init__(self, message, status=400):
def __init__(self, message, status=400, *, flash=False):
self.message = message
self.status = status
self.flash = flash
super().__init__(message)
def _actor_id(actor):
@ -194,6 +197,8 @@ async def _analyze_user_query(datasette, db, sql, *, actor):
await datasette.ensure_query_write_permissions(
db.name, sql, actor=actor, analysis=analysis
)
except QueryWriteRejected as ex:
raise QueryValidationError(ex.message, status=403, flash=True) from ex
except Forbidden as ex:
raise QueryValidationError(str(ex), status=403) from ex
else:
@ -297,6 +302,8 @@ async def _prepare_execute_write(datasette, db, sql, params, actor):
await datasette.ensure_query_write_permissions(
db.name, sql, actor=actor, analysis=analysis
)
except QueryWriteRejected as ex:
raise QueryValidationError(ex.message, status=403, flash=True) from ex
except Forbidden as ex:
raise QueryValidationError(str(ex), status=403) from ex
return parameter_names, params, analysis

View file

@ -2038,10 +2038,161 @@ async def test_execute_write_rejects_vacuum_operation():
assert denied_response.status_code == 403
assert denied_response.json()["errors"] == [
"Unsupported SQL operation: vacuum database"
"VACUUM is not allowed in user-supplied SQL"
]
@pytest.mark.asyncio
async def test_execute_write_form_rejects_vacuum_operation_with_flash_error():
ds = Datasette(
memory=True,
default_deny=True,
config={
"databases": {
"data": {
"permissions": {
"view-database": {"id": "writer"},
"execute-write-sql": {"id": "writer"},
}
}
}
},
)
ds.add_memory_database("execute_write_vacuum_operation_form", name="data")
await ds.invoke_startup()
denied_response = await ds.client.post(
"/data/-/execute-write",
actor={"id": "writer"},
data={"sql": "vacuum"},
)
assert denied_response.status_code == 403
assert (
'<p class="message-error">VACUUM is not allowed in user-supplied SQL</p>'
in denied_response.text
)
assert denied_response.text.count("VACUUM is not allowed in user-supplied SQL") == 1
@pytest.mark.asyncio
async def test_untrusted_stored_write_query_rejects_vacuum_operation():
ds = Datasette(
memory=True,
default_deny=True,
config={
"databases": {
"data": {
"permissions": {
"view-database": {"id": "writer"},
"view-query": {"id": "writer"},
"execute-write-sql": {"id": "writer"},
}
}
}
},
)
ds.add_memory_database("stored_query_vacuum_operation", name="data")
await ds.invoke_startup()
await ds.add_query(
"data",
"vacuum_db",
"vacuum",
is_write=True,
is_trusted=False,
source="user",
owner_id="writer",
)
denied_response = await ds.client.post(
"/data/vacuum_db?_json=1",
actor={"id": "writer"},
data={},
)
assert denied_response.status_code == 403
assert "VACUUM is not allowed in user-supplied SQL" in denied_response.text
@pytest.mark.asyncio
async def test_untrusted_stored_write_query_rejects_vacuum_operation_with_flash_error():
ds = Datasette(
memory=True,
default_deny=True,
config={
"databases": {
"data": {
"permissions": {
"view-database": {"id": "writer"},
"view-query": {"id": "writer"},
"execute-write-sql": {"id": "writer"},
}
}
}
},
)
ds.add_memory_database("stored_query_vacuum_operation_form", name="data")
await ds.invoke_startup()
await ds.add_query(
"data",
"vacuum_db",
"vacuum",
is_write=True,
is_trusted=False,
source="user",
owner_id="writer",
)
denied_response = await ds.client.post(
"/data/vacuum_db",
actor={"id": "writer"},
data={},
)
assert denied_response.status_code == 302
assert denied_response.headers["location"] == "/data/vacuum_db"
assert ds.unsign(denied_response.cookies["ds_messages"], "messages") == [
["VACUUM is not allowed in user-supplied SQL", ds.ERROR]
]
@pytest.mark.asyncio
async def test_trusted_stored_write_query_skips_vacuum_filtering():
ds = Datasette(
memory=True,
default_deny=True,
config={
"databases": {
"data": {
"permissions": {
"view-database": {"id": "writer"},
"view-query": {"id": "writer"},
}
}
}
},
)
ds.add_memory_database("trusted_stored_query_vacuum", name="data")
await ds.invoke_startup()
await ds.add_query(
"data",
"trusted_vacuum",
"vacuum",
is_write=True,
is_trusted=True,
source="config",
)
response = await ds.client.post(
"/data/trusted_vacuum?_json=1",
actor={"id": "writer"},
data={},
)
assert response.status_code == 200
assert response.json()["ok"] is True
@pytest.mark.asyncio
async def test_execute_write_create_table_uses_create_table_permission():
ds = Datasette(