Refactored for code readability

This commit is contained in:
Simon Willison 2026-05-28 09:50:56 -07:00
commit 8bd7e165f4
6 changed files with 339 additions and 221 deletions

View file

@ -42,7 +42,7 @@ from jinja2.exceptions import TemplateNotFound
from .events import Event
from .column_types import SQLiteType
from . import stored_queries
from . import stored_queries, write_sql
from .views import Context
from .views.database import (
database_download,
@ -1197,7 +1197,8 @@ class Datasette:
async def ensure_query_write_permissions(
self, database, sql, *, actor=None, params=None, analysis=None
):
return await stored_queries.ensure_query_write_permissions(
# Raise Forbidden or QueryWriteRejected if SQL should not run
return await write_sql.ensure_query_write_permissions(
self, database, sql, actor=actor, params=params, analysis=analysis
)

View file

@ -2,26 +2,13 @@ from __future__ import annotations
from dataclasses import dataclass
import json
from typing import Any, Iterable, TYPE_CHECKING
from typing import Any, Iterable
from .resources import DatabaseResource, TableResource
from .permissions import Resource
from .utils import named_parameters, sqlite3, tilde_encode, urlsafe_components
from .utils.asgi import Forbidden
from .utils.sql_analysis import Operation, SQLAnalysis
if TYPE_CHECKING:
from .app import Datasette
from .utils import tilde_encode, urlsafe_components
UNCHANGED = object()
class QueryWriteRejected(Exception):
def __init__(self, message: str):
self.message = message
super().__init__(message)
QUERY_OPTION_FIELDS = (
"hide_sql",
"fragment",
@ -593,197 +580,3 @@ async def list_queries(
has_more=has_more,
limit=limit,
)
@dataclass(frozen=True)
class PermissionRequirement:
action: str
resource: Resource
def row_mutation_requirements(
database: str, table: str
) -> tuple[PermissionRequirement, ...]:
resource = TableResource(database=database, table=table)
return tuple(
PermissionRequirement(action=action, resource=resource)
for action in ("insert-row", "update-row", "delete-row")
)
def permission_requirements_for_operation(
operation: Operation,
) -> tuple[PermissionRequirement, ...]:
if (
operation.operation == "read"
and operation.target_type == "table"
and operation.database is not None
and operation.table is not None
):
return (
PermissionRequirement(
action="view-table",
resource=TableResource(
database=operation.database, table=operation.table
),
),
)
if (
operation.operation in {"insert", "update"}
and operation.target_type == "table"
and operation.database is not None
and operation.table is not None
):
return row_mutation_requirements(
database=operation.database,
table=operation.table,
)
if (
operation.operation == "delete"
and operation.target_type == "table"
and operation.database is not None
and operation.table is not None
):
return (
PermissionRequirement(
action="delete-row",
resource=TableResource(
database=operation.database, table=operation.table
),
),
)
if operation.operation == "create" and operation.target_type == "table":
if operation.database is None:
return ()
return (
PermissionRequirement(
action="create-table",
resource=DatabaseResource(database=operation.database),
),
)
if (
operation.operation == "alter"
and operation.target_type == "table"
and operation.database is not None
and operation.table is not None
):
return (
PermissionRequirement(
action="alter-table",
resource=TableResource(
database=operation.database, table=operation.table
),
),
)
if (
operation.operation == "drop"
and operation.target_type == "table"
and operation.database is not None
and operation.table is not None
):
return (
PermissionRequirement(
action="drop-table",
resource=TableResource(
database=operation.database, table=operation.table
),
),
)
if (
operation.operation in {"create", "drop"}
and operation.target_type == "index"
and operation.database is not None
and operation.table is not None
):
return (
PermissionRequirement(
action="alter-table",
resource=TableResource(
database=operation.database, table=operation.table
),
),
)
return ()
def operation_should_be_ignored(operation: Operation) -> bool:
return operation.internal or operation.operation == "select"
def operation_forbidden_message(operation: Operation) -> str | None:
if operation.operation == "vacuum":
return "VACUUM is not allowed in user-supplied SQL"
if operation.operation in {"insert", "update", "delete"}:
if operation.table_kind == "virtual":
return "Writes to virtual tables are not allowed in user-supplied SQL"
if operation.table_kind == "shadow":
return "Writes to shadow tables are not allowed in user-supplied SQL"
return None
def operation_is_write(operation: Operation) -> bool:
return operation.operation in {
"insert",
"update",
"delete",
"create",
"alter",
"drop",
"begin",
"commit",
"rollback",
"savepoint",
"attach",
"detach",
"pragma",
"analyze",
"reindex",
"vacuum",
"unknown",
}
async def ensure_query_write_permissions(
datasette: Datasette,
database: str,
sql: str,
*,
actor: dict[str, object] | None = None,
params: dict[str, object] | None = None,
analysis: SQLAnalysis | None = None,
) -> SQLAnalysis:
db = datasette.get_database(database)
if analysis is None:
if params is None:
params = {name: "" for name in named_parameters(sql)}
try:
analysis = await db.analyze_sql(sql, params)
except sqlite3.DatabaseError as ex:
raise Forbidden(f"Could not analyze query: {ex}") from ex
for operation in analysis.operations:
if operation_should_be_ignored(operation):
continue
forbidden_message = operation_forbidden_message(operation)
if forbidden_message is not None:
raise QueryWriteRejected(forbidden_message)
permissions = permission_requirements_for_operation(operation)
if not permissions:
raise Forbidden(
"Unsupported SQL operation: {} {}".format(
operation.operation, operation.target_type
)
)
if operation.database != database:
raise Forbidden("Writable queries may not access attached databases")
for permission in permissions:
if not await datasette.allowed(
action=permission.action,
resource=permission.resource,
actor=actor,
):
raise Forbidden(
f"Permission denied: need {permission.action} "
f"on {permission.resource}"
)
return analysis

View file

@ -13,7 +13,8 @@ import textwrap
from datasette.events import AlterTableEvent, CreateTableEvent, InsertRowsEvent
from datasette.database import QueryInterrupted
from datasette.resources import DatabaseResource, QueryResource
from datasette.stored_queries import QueryWriteRejected, stored_query_to_dict
from datasette.stored_queries import stored_query_to_dict
from datasette.write_sql import QueryWriteRejected
from datasette.utils import (
add_cors_headers,
await_me_maybe,

View file

@ -3,11 +3,14 @@ import re
from datasette.resources import DatabaseResource
from datasette.stored_queries import (
QueryWriteRejected,
StoredQuery,
)
from datasette.write_sql import (
IgnoreWriteSqlOperation,
QueryWriteRejected,
RequireWriteSqlPermissions,
decision_for_write_sql_operation,
operation_is_write,
operation_should_be_ignored,
permission_requirements_for_operation,
)
from datasette.utils import (
named_parameters as derive_named_parameters,
@ -212,7 +215,9 @@ async def _analyze_user_query(datasette, db, sql, *, actor):
def _display_operations(analysis: SQLAnalysis) -> list[Operation]:
operations = []
for operation in analysis.operations:
if operation_should_be_ignored(operation):
if isinstance(
decision_for_write_sql_operation(operation), IgnoreWriteSqlOperation
):
continue
operations.append(operation)
return operations
@ -221,8 +226,12 @@ def _display_operations(analysis: SQLAnalysis) -> list[Operation]:
def _analysis_rows(analysis: SQLAnalysis) -> list[dict[str, object]]:
rows = []
for operation in _display_operations(analysis):
permissions = permission_requirements_for_operation(operation)
required_permission = ", ".join(permission.action for permission in permissions)
decision = decision_for_write_sql_operation(operation)
required_permission = (
", ".join(permission.action for permission in decision.permissions)
if isinstance(decision, RequireWriteSqlPermissions)
else ""
)
rows.append(
{
"operation": operation.operation,
@ -241,10 +250,10 @@ async def _analysis_rows_with_permissions(
rows = _analysis_rows(analysis)
is_write = _analysis_is_write(analysis)
for row, operation in zip(rows, _display_operations(analysis)):
permissions = permission_requirements_for_operation(operation)
if permissions:
decision = decision_for_write_sql_operation(operation)
if isinstance(decision, RequireWriteSqlPermissions):
row["allowed"] = True
for permission in permissions:
for permission in decision.permissions:
if not await datasette.allowed(
action=permission.action,
resource=permission.resource,

255
datasette/write_sql.py Normal file
View file

@ -0,0 +1,255 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
from .permissions import Resource
from .resources import DatabaseResource, TableResource
from .utils import named_parameters, sqlite3
from .utils.asgi import Forbidden
from .utils.sql_analysis import Operation, SQLAnalysis
if TYPE_CHECKING:
from .app import Datasette
class QueryWriteRejected(Exception):
def __init__(self, message: str):
self.message = message
super().__init__(message)
@dataclass(frozen=True)
class PermissionRequirement:
action: str
resource: Resource
PermissionRequirements = tuple[PermissionRequirement, ...]
class WriteSqlOperationDecision:
"""What Datasette should do with one operation in user-supplied write SQL."""
@dataclass(frozen=True)
class IgnoreWriteSqlOperation(WriteSqlOperationDecision):
reason: str
@dataclass(frozen=True)
class RequireWriteSqlPermissions(WriteSqlOperationDecision):
permissions: PermissionRequirements
@dataclass(frozen=True)
class RejectWriteSqlOperation(WriteSqlOperationDecision):
message: str
@dataclass(frozen=True)
class UnsupportedWriteSqlOperation(WriteSqlOperationDecision):
message: str
def row_mutation_requirements(database: str, table: str) -> PermissionRequirements:
resource = TableResource(database=database, table=table)
return tuple(
PermissionRequirement(action=action, resource=resource)
for action in ("insert-row", "update-row", "delete-row")
)
def decision_for_write_sql_operation(
operation: Operation,
) -> WriteSqlOperationDecision:
unsupported_message = (
f"Unsupported SQL operation: {operation.operation} {operation.target_type}"
)
if operation.internal:
return IgnoreWriteSqlOperation("internal SQLite operation")
if operation.operation == "select":
return IgnoreWriteSqlOperation("select statement")
if operation.operation == "vacuum":
return RejectWriteSqlOperation("VACUUM is not allowed in user-supplied SQL")
if operation.operation in {"insert", "update", "delete"}:
if operation.table_kind == "virtual":
return RejectWriteSqlOperation(
"Writes to virtual tables are not allowed in user-supplied SQL"
)
if operation.table_kind == "shadow":
return RejectWriteSqlOperation(
"Writes to shadow tables are not allowed in user-supplied SQL"
)
if operation.operation == "function":
# SQL functions currently have no Datasette permission mapping. They are
# rejected by the user-supplied write SQL allow-list as unsupported.
return UnsupportedWriteSqlOperation(unsupported_message)
if (
operation.operation == "read"
and operation.target_type == "table"
and operation.database is not None
and operation.table is not None
):
return RequireWriteSqlPermissions(
(
PermissionRequirement(
action="view-table",
resource=TableResource(
database=operation.database, table=operation.table
),
),
)
)
if (
operation.operation in {"insert", "update"}
and operation.target_type == "table"
and operation.database is not None
and operation.table is not None
):
return RequireWriteSqlPermissions(
row_mutation_requirements(
database=operation.database,
table=operation.table,
)
)
if (
operation.operation == "delete"
and operation.target_type == "table"
and operation.database is not None
and operation.table is not None
):
return RequireWriteSqlPermissions(
(
PermissionRequirement(
action="delete-row",
resource=TableResource(
database=operation.database, table=operation.table
),
),
)
)
if operation.operation == "create" and operation.target_type == "table":
if operation.database is None:
return UnsupportedWriteSqlOperation(unsupported_message)
return RequireWriteSqlPermissions(
(
PermissionRequirement(
action="create-table",
resource=DatabaseResource(database=operation.database),
),
)
)
if (
operation.operation == "alter"
and operation.target_type == "table"
and operation.database is not None
and operation.table is not None
):
return RequireWriteSqlPermissions(
(
PermissionRequirement(
action="alter-table",
resource=TableResource(
database=operation.database, table=operation.table
),
),
)
)
if (
operation.operation == "drop"
and operation.target_type == "table"
and operation.database is not None
and operation.table is not None
):
return RequireWriteSqlPermissions(
(
PermissionRequirement(
action="drop-table",
resource=TableResource(
database=operation.database, table=operation.table
),
),
)
)
if (
operation.operation in {"create", "drop"}
and operation.target_type == "index"
and operation.database is not None
and operation.table is not None
):
return RequireWriteSqlPermissions(
(
PermissionRequirement(
action="alter-table",
resource=TableResource(
database=operation.database, table=operation.table
),
),
)
)
return UnsupportedWriteSqlOperation(unsupported_message)
def operation_is_write(operation: Operation) -> bool:
return operation.operation in {
"insert",
"update",
"delete",
"create",
"alter",
"drop",
"begin",
"commit",
"rollback",
"savepoint",
"attach",
"detach",
"pragma",
"analyze",
"reindex",
"vacuum",
"unknown",
}
async def ensure_query_write_permissions(
datasette: Datasette,
database: str,
sql: str,
*,
actor: dict[str, object] | None = None,
params: dict[str, object] | None = None,
analysis: SQLAnalysis | None = None,
) -> SQLAnalysis:
db = datasette.get_database(database)
if analysis is None:
if params is None:
params = {name: "" for name in named_parameters(sql)}
try:
analysis = await db.analyze_sql(sql, params)
except sqlite3.DatabaseError as ex:
raise Forbidden(f"Could not analyze query: {ex}") from ex
for operation in analysis.operations:
decision = decision_for_write_sql_operation(operation)
if isinstance(decision, IgnoreWriteSqlOperation):
continue
if isinstance(decision, RejectWriteSqlOperation):
raise QueryWriteRejected(decision.message)
if isinstance(decision, UnsupportedWriteSqlOperation):
raise Forbidden(decision.message)
permissions = decision.permissions
if operation.database != database:
raise Forbidden("Writable queries may not access attached databases")
for permission in permissions:
if not await datasette.allowed(
action=permission.action,
resource=permission.resource,
actor=actor,
):
raise Forbidden(
f"Permission denied: need {permission.action} "
f"on {permission.resource}"
)
return analysis

59
tests/test_write_sql.py Normal file
View file

@ -0,0 +1,59 @@
from datasette.utils.sql_analysis import Operation
from datasette.write_sql import (
IgnoreWriteSqlOperation,
RejectWriteSqlOperation,
RequireWriteSqlPermissions,
UnsupportedWriteSqlOperation,
WriteSqlOperationDecision,
decision_for_write_sql_operation,
)
def test_decision_for_write_sql_operation_ignores_internal_and_select_operations():
internal_decision = decision_for_write_sql_operation(
Operation("read", "schema", None, None, "main", internal=True)
)
select_decision = decision_for_write_sql_operation(
Operation("select", "statement", None, None, None)
)
assert isinstance(internal_decision, IgnoreWriteSqlOperation)
assert isinstance(internal_decision, WriteSqlOperationDecision)
assert isinstance(select_decision, IgnoreWriteSqlOperation)
assert isinstance(select_decision, WriteSqlOperationDecision)
def test_decision_for_write_sql_operation_requires_table_write_permissions():
decision = decision_for_write_sql_operation(
Operation("insert", "table", "data", "dogs", None)
)
assert isinstance(decision, RequireWriteSqlPermissions)
assert [permission.action for permission in decision.permissions] == [
"insert-row",
"update-row",
"delete-row",
]
assert [str(permission.resource) for permission in decision.permissions] == [
"data/dogs",
"data/dogs",
"data/dogs",
]
def test_decision_for_write_sql_operation_rejects_vacuum():
decision = decision_for_write_sql_operation(
Operation("vacuum", "statement", None, None, None)
)
assert isinstance(decision, RejectWriteSqlOperation)
assert decision.message == "VACUUM is not allowed in user-supplied SQL"
def test_decision_for_write_sql_operation_reports_unsupported_functions():
decision = decision_for_write_sql_operation(
Operation("function", "function", None, None, None, target="upper")
)
assert isinstance(decision, UnsupportedWriteSqlOperation)
assert decision.message == "Unsupported SQL operation: function function"