Compare commits

..

15 commits

Author SHA1 Message Date
Simon Willison
cbe9594a3d Use SQLiteTableType directly in SQL analysis
Remove the redundant SQLTableKind alias from the write SQL analysis model. Operation.table_kind and the analyzer cache now use the SQLite metadata classification type directly, making the source of table-kind values clearer.
2026-05-28 11:00:04 -07:00
Simon Willison
b2b20b36c5 Document write SQL analyzer restrictions
Expand the unreleased changelog with the deny-by-default operation analysis model, SQL function handling, and the VACUUM and virtual/shadow table restrictions for user-supplied write SQL.

Clarify the /-/execute-write JSON API documentation with the same restrictions and DDL permission requirements.
2026-05-28 10:24:40 -07:00
Simon Willison
51dab16149 Allow SQL functions in SQL write queries
Closes #2751
2026-05-28 10:22:28 -07:00
Simon Willison
8bd7e165f4 Refactored for code readability 2026-05-28 09:50:56 -07:00
Simon Willison
2785fd29de Fix tests I just broke 2026-05-28 09:03:10 -07:00
Simon Willison
aaf00e9ec2 Refactor hidden_table_names() to use new implemenatation
Refs https://github.com/simonw/datasette/pull/2749#issuecomment-4565727978
2026-05-28 08:42:06 -07:00
Simon Willison
bcd989f4f8 Detect and disallow insert to virtual/shadow table
Refs https://github.com/simonw/datasette/pull/2749#issuecomment-4565727978
2026-05-28 08:36:59 -07:00
Simon Willison
0c5053cdf6 Docs for /<database>/-/execute-write JSON API
Closes #2750, refs #2742
2026-05-27 17:26:50 -07:00
Simon Willison
11bddc8919 Deny VACUUM in user-authored SQL
Reject VACUUM explicitly during write-query permission analysis so arbitrary write SQL and untrusted stored write queries cannot run it, even when the actor has execute-write-sql.

Refs https://github.com/simonw/datasette/pull/2749#issuecomment-4559073803 (P3)
2026-05-27 17:09:27 -07:00
Simon Willison
951f5a9f30 Detect VACUUM in SQL analysis
Refs https://github.com/simonw/datasette/pull/2749#issuecomment-4559073803
2026-05-27 16:30:21 -07:00
Simon Willison
1932f8429f Deny user-authored schema table reads in write SQL
Stop marking sqlite_master and sqlite_schema reads as internal as soon as the SQLite authorizer reports them. The later DDL-aware pass still treats schema catalog access as internal when it accompanies semantic CREATE, ALTER, or DROP operations.

This makes explicit catalog reads in write SQL fall through to the deny-by-default path as unsupported read schema operations, preventing queries from copying private table definitions into writable tables.

Refs https://github.com/simonw/datasette/pull/2749#issuecomment-4559073803
2026-05-27 16:14:56 -07:00
Simon Willison
03b2c66f63 Require full row mutation permissions for raw SQL
Raw SQL insert and update statements can have broader effects than their SQLite authorizer callbacks reveal. INSERT OR REPLACE and UPDATE OR REPLACE can delete conflicting rows while only surfacing insert or update operations.

Expand table insert and update operations to require insert-row, update-row, and delete-row together. Keep delete operations mapped to delete-row, and update the analysis UI/API to report and evaluate multiple required permissions for a single operation.

Refs https://github.com/simonw/datasette/pull/2749#issuecomment-4559083539
2026-05-27 15:17:22 -07:00
Simon Willison
86d0e7335f Deny unsupported write SQL operations by default
Require view-table permission for reads discovered inside write SQL analysis, including INSERT ... SELECT and CREATE TABLE ... AS SELECT.

Record additional SQLite authorizer callbacks as Operation values so unsupported functions, savepoints, virtual table DDL, and unknown callbacks are denied unless explicitly handled.
2026-05-27 14:52:52 -07:00
Simon Willison
737ff03efb Expanded analysis of SQL operations, refs #2748 2026-05-26 22:11:35 -07:00
Simon Willison
9f66cf72c1 Removed execute write SQL from query create page 2026-05-26 21:42:50 -07:00
21 changed files with 2481 additions and 268 deletions

View file

@ -42,7 +42,7 @@ from jinja2.exceptions import TemplateNotFound
from .events import Event from .events import Event
from .column_types import SQLiteType from .column_types import SQLiteType
from . import stored_queries from . import stored_queries, write_sql
from .views import Context from .views import Context
from .views.database import ( from .views.database import (
database_download, database_download,
@ -1197,7 +1197,8 @@ class Datasette:
async def ensure_query_write_permissions( async def ensure_query_write_permissions(
self, database, sql, *, actor=None, params=None, analysis=None self, database, sql, *, actor=None, params=None, analysis=None
): ):
return await stored_queries.ensure_query_write_permissions( # Raise Forbidden or QueryWriteRejected if SQL should not run
return await write_sql.ensure_query_write_permissions(
self, database, sql, actor=actor, params=params, analysis=analysis self, database, sql, actor=actor, params=params, analysis=analysis
) )

View file

@ -26,7 +26,7 @@ from .utils import (
table_column_details, table_column_details,
) )
from .utils.sql_analysis import SQLAnalysis, analyze_sql_tables from .utils.sql_analysis import SQLAnalysis, analyze_sql_tables
from .utils.sqlite import sqlite_version from .utils.sqlite import sqlite_hidden_table_names
from .inspect import inspect_hash from .inspect import inspect_hash
connections = threading.local() connections = threading.local()
@ -702,83 +702,7 @@ class Database:
t for t in db_config["tables"] if db_config["tables"][t].get("hidden") t for t in db_config["tables"] if db_config["tables"][t].get("hidden")
] ]
if sqlite_version()[1] >= 37: hidden_tables += await self.execute_fn(sqlite_hidden_table_names)
hidden_tables += [x[0] for x in await self.execute("""
with shadow_tables as (
select name
from pragma_table_list
where [type] = 'shadow'
order by name
),
core_tables as (
select name
from sqlite_master
WHERE name in ('sqlite_stat1', 'sqlite_stat2', 'sqlite_stat3', 'sqlite_stat4')
OR substr(name, 1, 1) == '_'
),
combined as (
select name from shadow_tables
union all
select name from core_tables
)
select name from combined order by 1
""")]
else:
hidden_tables += [x[0] for x in await self.execute("""
WITH base AS (
SELECT name
FROM sqlite_master
WHERE name IN ('sqlite_stat1', 'sqlite_stat2', 'sqlite_stat3', 'sqlite_stat4')
OR substr(name, 1, 1) == '_'
),
fts_suffixes AS (
SELECT column1 AS suffix
FROM (VALUES ('_data'), ('_idx'), ('_docsize'), ('_content'), ('_config'))
),
fts5_names AS (
SELECT name
FROM sqlite_master
WHERE sql LIKE '%VIRTUAL TABLE%USING FTS%'
),
fts5_shadow_tables AS (
SELECT
printf('%s%s', fts5_names.name, fts_suffixes.suffix) AS name
FROM fts5_names
JOIN fts_suffixes
),
fts3_suffixes AS (
SELECT column1 AS suffix
FROM (VALUES ('_content'), ('_segdir'), ('_segments'), ('_stat'), ('_docsize'))
),
fts3_names AS (
SELECT name
FROM sqlite_master
WHERE sql LIKE '%VIRTUAL TABLE%USING FTS3%'
OR sql LIKE '%VIRTUAL TABLE%USING FTS4%'
),
fts3_shadow_tables AS (
SELECT
printf('%s%s', fts3_names.name, fts3_suffixes.suffix) AS name
FROM fts3_names
JOIN fts3_suffixes
),
final AS (
SELECT name FROM base
UNION ALL
SELECT name FROM fts5_shadow_tables
UNION ALL
SELECT name FROM fts3_shadow_tables
)
SELECT name FROM final ORDER BY 1
""")]
# Also hide any FTS tables that have a content= argument
hidden_tables += [x[0] for x in await self.execute("""
SELECT name
FROM sqlite_master
WHERE sql LIKE '%VIRTUAL TABLE%'
AND sql LIKE '%USING FTS%'
AND sql LIKE '%content=%'
""")]
has_spatialite = await self.execute_fn(detect_spatialite) has_spatialite = await self.execute_fn(detect_spatialite)
if has_spatialite: if has_spatialite:

View file

@ -58,6 +58,16 @@ class Resource(ABC):
self.child = child self.child = child
self._private = None # Sentinel to track if private was set self._private = None # Sentinel to track if private was set
def __str__(self) -> str:
return "/".join(
str(part) for part in (self.parent, self.child) if part is not None
)
def __repr__(self) -> str:
return "{}(parent={!r}, child={!r})".format(
self.__class__.__name__, self.parent, self.child
)
@property @property
def private(self) -> bool: def private(self) -> bool:
""" """

View file

@ -4,12 +4,11 @@ from dataclasses import dataclass
import json import json
from typing import Any, Iterable from typing import Any, Iterable
from .resources import TableResource from .utils import tilde_encode, urlsafe_components
from .utils import named_parameters, sqlite3, tilde_encode, urlsafe_components
from .utils.asgi import Forbidden
UNCHANGED = object() UNCHANGED = object()
QUERY_OPTION_FIELDS = ( QUERY_OPTION_FIELDS = (
"hide_sql", "hide_sql",
"fragment", "fragment",
@ -581,43 +580,3 @@ async def list_queries(
has_more=has_more, has_more=has_more,
limit=limit, limit=limit,
) )
async def ensure_query_write_permissions(
datasette: Any,
database: str,
sql: str,
*,
actor: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
analysis: Any = None,
) -> Any:
write_actions = {
"insert": "insert-row",
"update": "update-row",
"delete": "delete-row",
}
db = datasette.get_database(database)
if analysis is None:
if params is None:
params = {name: "" for name in named_parameters(sql)}
try:
analysis = await db.analyze_sql(sql, params)
except sqlite3.DatabaseError as ex:
raise Forbidden(f"Could not analyze query: {ex}") from ex
for access in analysis.table_accesses:
action = write_actions.get(access.operation)
if action is None:
continue
if access.database != database:
raise Forbidden("Writable queries may not write to attached databases")
if not await datasette.allowed(
action=action,
resource=TableResource(database=access.database, table=access.table),
actor=actor,
):
raise Forbidden(
f"Permission denied: need {action} on {access.database}/{access.table}"
)
return analysis

View file

@ -106,9 +106,6 @@ form.sql .query-create-sql textarea#sql-editor {
.query-create-analysis-note { .query-create-analysis-note {
margin: 0; margin: 0;
} }
.query-create-action {
margin: 0.35rem 0 1rem;
}
.query-create-analysis { .query-create-analysis {
margin-top: 0.8rem; margin-top: 0.8rem;
} }
@ -171,10 +168,6 @@ form.sql .query-create-sql textarea#sql-editor {
<label><input type="checkbox" name="is_private" value="1"{% if is_private %} checked{% endif %}> Private</label> <label><input type="checkbox" name="is_private" value="1"{% if is_private %} checked{% endif %}> Private</label>
<span class="query-create-option-note">Queries marked private can only be seen by you, their creator.</span> <span class="query-create-option-note">Queries marked private can only be seen by you, their creator.</span>
</p> </p>
{% if sql and analysis_is_write %}
<p class="query-create-action"><a href="{{ urls.database(database) }}/-/execute-write?{{ {'sql': sql}|urlencode|safe }}">Execute write SQL</a></p>
{% endif %}
<p class="query-create-submit"><input type="submit" value="Save query" data-query-create-submit{% if save_disabled %} disabled{% endif %}></p> <p class="query-create-submit"><input type="submit" value="Save query" data-query-create-submit{% if save_disabled %} disabled{% endif %}></p>
<div class="query-create-analysis" id="query-create-analysis-section"{% if not has_sql %} hidden{% endif %}> <div class="query-create-analysis" id="query-create-analysis-section"{% if not has_sql %} hidden{% endif %}>

View file

@ -1,24 +1,79 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import Literal from typing import Literal
from datasette.utils.sqlite import sqlite3 from datasette.utils.sqlite import SQLiteTableType, sqlite3, sqlite_table_type
SQLOperation = Literal[
"read",
"insert",
"update",
"delete",
"select",
"function",
"create",
"alter",
"drop",
"begin",
"commit",
"rollback",
"savepoint",
"attach",
"detach",
"pragma",
"analyze",
"reindex",
"vacuum",
"unknown",
]
SQLTargetType = Literal[
"table",
"index",
"view",
"trigger",
"virtual-table",
"schema",
"statement",
"transaction",
"database",
"pragma",
"function",
"unknown",
]
SQLTableOperation = Literal["read", "insert", "update", "delete"] SQLTableOperation = Literal["read", "insert", "update", "delete"]
SQLSchemaOperation = Literal["create", "drop"]
SQLSchemaTargetType = Literal["index", "table", "trigger", "view", "virtual-table"]
@dataclass(frozen=True) @dataclass(frozen=True)
class SQLTableAccess: class Operation:
operation: SQLTableOperation operation: SQLOperation
target_type: SQLTargetType
database: str | None database: str | None
table: str table: str | None
sqlite_schema: str | None sqlite_schema: str | None
table_kind: SQLiteTableType | None = None
target: str | None = None
columns: tuple[str, ...] = () columns: tuple[str, ...] = ()
source: str | None = None source: str | None = None
internal: bool = False
@dataclass(frozen=True) @dataclass(frozen=True)
class SQLAnalysis: class SQLAnalysis:
table_accesses: tuple[SQLTableAccess, ...] operations: tuple[Operation, ...]
# Hashable dict key for grouping repeated authorizer callbacks while collecting columns.
@dataclass(frozen=True)
class OperationKey:
operation: SQLOperation
target_type: SQLTargetType
database: str | None
table: str | None
sqlite_schema: str | None
target: str | None
source: str | None
internal: bool
_ACTION_TO_OPERATION: dict[int, SQLTableOperation] = { _ACTION_TO_OPERATION: dict[int, SQLTableOperation] = {
@ -28,6 +83,118 @@ _ACTION_TO_OPERATION: dict[int, SQLTableOperation] = {
sqlite3.SQLITE_DELETE: "delete", sqlite3.SQLITE_DELETE: "delete",
} }
# Values are (operation, target_type) pairs used to construct Operation objects.
_CREATE_ACTIONS: dict[int, tuple[SQLSchemaOperation, SQLSchemaTargetType]] = {
sqlite3.SQLITE_CREATE_INDEX: ("create", "index"),
sqlite3.SQLITE_CREATE_TABLE: ("create", "table"),
sqlite3.SQLITE_CREATE_TRIGGER: ("create", "trigger"),
sqlite3.SQLITE_CREATE_VIEW: ("create", "view"),
}
_DROP_ACTIONS: dict[int, tuple[SQLSchemaOperation, SQLSchemaTargetType]] = {
sqlite3.SQLITE_DROP_INDEX: ("drop", "index"),
sqlite3.SQLITE_DROP_TABLE: ("drop", "table"),
sqlite3.SQLITE_DROP_TRIGGER: ("drop", "trigger"),
sqlite3.SQLITE_DROP_VIEW: ("drop", "view"),
}
def _add_schema_action(
action_name: str,
operation: SQLSchemaOperation,
target_type: SQLSchemaTargetType,
) -> None:
action_value = getattr(sqlite3, action_name, None)
if action_value is not None:
actions = _CREATE_ACTIONS if operation == "create" else _DROP_ACTIONS
actions[action_value] = (operation, target_type)
_TEMP_SCHEMA_ACTIONS: tuple[
tuple[str, SQLSchemaOperation, SQLSchemaTargetType], ...
] = (
("SQLITE_CREATE_TEMP_INDEX", "create", "index"),
("SQLITE_CREATE_TEMP_TABLE", "create", "table"),
("SQLITE_CREATE_TEMP_TRIGGER", "create", "trigger"),
("SQLITE_CREATE_TEMP_VIEW", "create", "view"),
("SQLITE_DROP_TEMP_INDEX", "drop", "index"),
("SQLITE_DROP_TEMP_TABLE", "drop", "table"),
("SQLITE_DROP_TEMP_TRIGGER", "drop", "trigger"),
("SQLITE_DROP_TEMP_VIEW", "drop", "view"),
)
for schema_action in _TEMP_SCHEMA_ACTIONS:
_add_schema_action(*schema_action)
_VTABLE_SCHEMA_ACTIONS: tuple[
tuple[str, SQLSchemaOperation, SQLSchemaTargetType], ...
] = (
("SQLITE_CREATE_VTABLE", "create", "virtual-table"),
("SQLITE_DROP_VTABLE", "drop", "virtual-table"),
)
for schema_action in _VTABLE_SCHEMA_ACTIONS:
_add_schema_action(*schema_action)
_SQLITE_SCHEMA_TABLES = {
"sqlite_master",
"sqlite_schema",
"sqlite_temp_master",
"sqlite_temp_schema",
}
_SQLITE_INTERNAL_SCHEMA_FUNCTIONS = {
"length",
"like",
"printf",
"sqlite_drop_column",
"sqlite_rename_column",
"sqlite_rename_quotefix",
"sqlite_rename_table",
"sqlite_rename_test",
"substr",
}
_AUTHORIZER_ACTION_NAMES = {
getattr(sqlite3, name): name
for name in (
"SQLITE_CREATE_INDEX",
"SQLITE_CREATE_TABLE",
"SQLITE_CREATE_TEMP_INDEX",
"SQLITE_CREATE_TEMP_TABLE",
"SQLITE_CREATE_TEMP_TRIGGER",
"SQLITE_CREATE_TEMP_VIEW",
"SQLITE_CREATE_TRIGGER",
"SQLITE_CREATE_VIEW",
"SQLITE_DELETE",
"SQLITE_DROP_INDEX",
"SQLITE_DROP_TABLE",
"SQLITE_DROP_TEMP_INDEX",
"SQLITE_DROP_TEMP_TABLE",
"SQLITE_DROP_TEMP_TRIGGER",
"SQLITE_DROP_TEMP_VIEW",
"SQLITE_DROP_TRIGGER",
"SQLITE_DROP_VIEW",
"SQLITE_INSERT",
"SQLITE_PRAGMA",
"SQLITE_READ",
"SQLITE_SELECT",
"SQLITE_TRANSACTION",
"SQLITE_UPDATE",
"SQLITE_ATTACH",
"SQLITE_DETACH",
"SQLITE_ALTER_TABLE",
"SQLITE_REINDEX",
"SQLITE_ANALYZE",
"SQLITE_CREATE_VTABLE",
"SQLITE_DROP_VTABLE",
"SQLITE_FUNCTION",
"SQLITE_SAVEPOINT",
"SQLITE_RECURSIVE",
)
if hasattr(sqlite3, name)
}
def _allow_authorizer_action(*args):
return sqlite3.SQLITE_OK
def analyze_sql_tables( def analyze_sql_tables(
conn, conn,
@ -38,15 +205,13 @@ def analyze_sql_tables(
schema_to_database: dict[str, str] | None = None, schema_to_database: dict[str, str] | None = None,
) -> SQLAnalysis: ) -> SQLAnalysis:
""" """
Return tables accessed by a SQL statement according to SQLite's authorizer. Return operations performed by a SQL statement according to SQLite's authorizer.
This function is synchronous and connection-based. It temporarily installs a This function is synchronous and connection-based. It temporarily installs a
SQLite authorizer, prepares ``EXPLAIN <sql>``, and returns the table access SQLite authorizer, prepares ``EXPLAIN <sql>``, and returns the operation
callbacks observed while SQLite compiles the statement. callbacks observed while SQLite compiles the statement.
""" """
accesses: dict[ operations: dict[OperationKey, set[str]] = {}
tuple[SQLTableOperation, str | None, str, str | None, str | None], set[str]
] = {}
def database_for_schema(sqlite_schema): def database_for_schema(sqlite_schema):
if schema_to_database and sqlite_schema in schema_to_database: if schema_to_database and sqlite_schema in schema_to_database:
@ -55,45 +220,331 @@ def analyze_sql_tables(
return database_name return database_name
return sqlite_schema return sqlite_schema
def authorizer(action, arg1, arg2, sqlite_schema, source): def record(
operation = _ACTION_TO_OPERATION.get(action) operation: SQLOperation,
if operation is None or arg1 is None: target_type: SQLTargetType,
return sqlite3.SQLITE_OK *,
database: str | None,
key = ( table: str | None,
operation, sqlite_schema: str | None,
database_for_schema(sqlite_schema), target: str | None,
arg1, source: str | None,
sqlite_schema, column: str | None = None,
source, internal: bool = False,
) ):
columns = accesses.setdefault(key, set()) key = OperationKey(
if operation in ("read", "update") and arg2 is not None:
columns.add(arg2)
return sqlite3.SQLITE_OK
conn.set_authorizer(authorizer)
try:
conn.execute("EXPLAIN " + sql, params if params is not None else {}).fetchall()
finally:
conn.set_authorizer(None)
return SQLAnalysis(
table_accesses=tuple(
SQLTableAccess(
operation=operation, operation=operation,
target_type=target_type,
database=database, database=database,
table=table, table=table,
sqlite_schema=sqlite_schema, sqlite_schema=sqlite_schema,
columns=tuple(sorted(columns)), target=target,
source=source,
internal=internal,
)
columns = operations.setdefault(key, set())
if column is not None:
columns.add(column)
def authorizer(action, arg1, arg2, sqlite_schema, source):
operation = _ACTION_TO_OPERATION.get(action)
if operation is not None and arg1 is not None:
target_type = "schema" if arg1 in _SQLITE_SCHEMA_TABLES else "table"
column = (
arg2 if operation in ("read", "update") and arg2 is not None else None
)
record(
operation,
target_type,
database=database_for_schema(sqlite_schema),
table=arg1 if target_type == "table" else None,
sqlite_schema=sqlite_schema,
target=arg1,
source=source,
column=column,
)
return sqlite3.SQLITE_OK
create_operation = _CREATE_ACTIONS.get(action)
if create_operation is not None and arg1 is not None:
operation, target_type = create_operation
related_table = arg2 if target_type in {"index", "trigger"} else arg1
record(
operation,
target_type,
database=database_for_schema(sqlite_schema),
table=related_table,
sqlite_schema=sqlite_schema,
target=arg1,
source=source, source=source,
) )
for ( return sqlite3.SQLITE_OK
drop_operation = _DROP_ACTIONS.get(action)
if drop_operation is not None and arg1 is not None:
operation, target_type = drop_operation
related_table = arg2 if target_type in {"index", "trigger"} else arg1
record(
operation, operation,
database, target_type,
table, database=database_for_schema(sqlite_schema),
sqlite_schema, table=related_table,
source, sqlite_schema=sqlite_schema,
), columns in accesses.items() target=arg1,
source=source,
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_ALTER_TABLE and arg2 is not None:
record(
"alter",
"table",
database=database_for_schema(arg1),
table=arg2,
sqlite_schema=arg1,
target=arg2,
source=source,
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_TRANSACTION and arg1 is not None:
record(
arg1.lower(),
"transaction",
database=None,
table=None,
sqlite_schema=None,
target=arg1,
source=source,
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_ATTACH and arg1 is not None:
record(
"attach",
"database",
database=None,
table=None,
sqlite_schema=None,
target=arg1,
source=source,
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_DETACH and arg1 is not None:
record(
"detach",
"database",
database=None,
table=None,
sqlite_schema=None,
target=arg1,
source=source,
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_PRAGMA and arg1 is not None:
record(
"pragma",
"pragma",
database=None,
table=None,
sqlite_schema=sqlite_schema,
target=arg1,
source=source,
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_ANALYZE:
record(
"analyze",
"database" if arg1 is None else "table",
database=database_for_schema(sqlite_schema),
table=arg1,
sqlite_schema=sqlite_schema,
target=arg1,
source=source,
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_REINDEX and arg1 is not None:
record(
"reindex",
"index",
database=database_for_schema(sqlite_schema),
table=None,
sqlite_schema=sqlite_schema,
target=arg1,
source=source,
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_SELECT:
record(
"select",
"statement",
database=None,
table=None,
sqlite_schema=sqlite_schema,
target=None,
source=source,
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_FUNCTION and arg2 is not None:
record(
"function",
"function",
database=None,
table=None,
sqlite_schema=sqlite_schema,
target=arg2,
source=source,
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_SAVEPOINT and arg1 is not None:
record(
"savepoint",
"transaction",
database=None,
table=None,
sqlite_schema=sqlite_schema,
target="{} {}".format(arg1, arg2) if arg2 is not None else arg1,
source=source,
)
return sqlite3.SQLITE_OK
action_name = _AUTHORIZER_ACTION_NAMES.get(action, "SQLITE_{}".format(action))
record(
"unknown",
"unknown",
database=database_for_schema(sqlite_schema),
table=None,
sqlite_schema=sqlite_schema,
target=action_name,
source=source,
)
return sqlite3.SQLITE_OK
table_kind_cache: dict[tuple[str | None, str], SQLiteTableType | None] = {}
conn.set_authorizer(authorizer)
try:
explain_rows = conn.execute(
"EXPLAIN " + sql, params if params is not None else {}
).fetchall()
# Passing None before these lookups leaves a failing callback installed
# on Python 3.10, so use a permissive callback until they are complete.
conn.set_authorizer(_allow_authorizer_action)
if not operations:
vacuum_row = next((row for row in explain_rows if row[1] == "Vacuum"), None)
if vacuum_row is not None:
schema_by_index = {
row[0]: row[1] for row in conn.execute("PRAGMA database_list")
}
sqlite_schema = schema_by_index.get(vacuum_row[2])
database = database_for_schema(sqlite_schema)
record(
"vacuum",
"database",
database=database,
table=None,
sqlite_schema=sqlite_schema,
target=database,
source=None,
)
else:
record(
"unknown",
"statement",
database=database_name,
table=None,
sqlite_schema=None,
target=None,
source=None,
)
for key in operations:
if (
key.target_type == "table"
and key.operation in {"read", "insert", "update", "delete"}
and key.table is not None
):
cache_key = (key.sqlite_schema, key.table)
if cache_key not in table_kind_cache:
table_kind_cache[cache_key] = sqlite_table_type(
conn, key.table, schema=key.sqlite_schema
)
finally:
conn.set_authorizer(None)
has_schema_operation = any(
key.target_type in {"table", "index", "view", "trigger", "virtual-table"}
and key.operation in {"create", "alter", "drop"}
for key in operations
)
dropped_tables = {
(key.database, key.table)
for key in operations
if key.operation == "drop" and key.target_type == "table"
}
def key_is_drop_table_delete(key: OperationKey) -> bool:
return (
key.operation == "delete"
and key.target_type == "table"
and (key.database, key.table) in dropped_tables
)
has_user_table_access_in_schema_operation = any(
key.operation in {"read", "insert", "update", "delete"}
and key.target_type == "table"
and not key.internal
and not key_is_drop_table_delete(key)
for key in operations
)
def operation_is_internal(key: OperationKey) -> bool:
if key.internal or (has_schema_operation and key.target_type == "schema"):
return True
if has_schema_operation and key.operation == "reindex":
return True
if (
has_schema_operation
and not has_user_table_access_in_schema_operation
and key.operation == "function"
and key.target in _SQLITE_INTERNAL_SCHEMA_FUNCTIONS
):
return True
if key_is_drop_table_delete(key):
return True
return False
def table_kind_for(key: OperationKey) -> SQLiteTableType | None:
if (
key.target_type != "table"
or key.operation not in {"read", "insert", "update", "delete"}
or key.table is None
):
return None
return table_kind_cache[(key.sqlite_schema, key.table)]
return SQLAnalysis(
operations=tuple(
Operation(
operation=key.operation,
target_type=key.target_type,
database=key.database,
table=key.table,
sqlite_schema=key.sqlite_schema,
table_kind=table_kind_for(key),
target=key.target,
columns=tuple(sorted(columns)),
source=key.source,
internal=operation_is_internal(key),
)
for key, columns in operations.items()
) )
) )

View file

@ -1,3 +1,6 @@
import re
from typing import Literal
using_pysqlite3 = False using_pysqlite3 = False
try: try:
import pysqlite3 as sqlite3 import pysqlite3 as sqlite3
@ -10,6 +13,18 @@ if hasattr(sqlite3, "enable_callback_tracebacks"):
sqlite3.enable_callback_tracebacks(True) sqlite3.enable_callback_tracebacks(True)
_cached_sqlite_version = None _cached_sqlite_version = None
SQLiteTableType = Literal["table", "view", "virtual", "shadow"]
_VIRTUAL_TABLE_MODULE_RE = re.compile(
r"\bCREATE\s+VIRTUAL\s+TABLE\b.*?\bUSING\s+([^\s(]+)",
re.IGNORECASE | re.DOTALL,
)
_VIRTUAL_TABLE_SHADOW_SUFFIXES = {
"fts3": ("_content", "_segdir", "_segments", "_stat", "_docsize"),
"fts4": ("_content", "_segdir", "_segments", "_stat", "_docsize"),
"fts5": ("_data", "_idx", "_docsize", "_content", "_config"),
"rtree": ("_node", "_parent", "_rowid"),
"rtree_i32": ("_node", "_parent", "_rowid"),
}
def sqlite_version(): def sqlite_version():
@ -36,5 +51,131 @@ def supports_table_xinfo():
return sqlite_version() >= (3, 26, 0) return sqlite_version() >= (3, 26, 0)
def supports_table_list():
return sqlite_version() >= (3, 37, 0)
def supports_generated_columns(): def supports_generated_columns():
return sqlite_version() >= (3, 31, 0) return sqlite_version() >= (3, 31, 0)
def sqlite_table_type(
conn,
table: str,
*,
schema: str | None = "main",
) -> SQLiteTableType | None:
if supports_table_list():
try:
query = "select type from pragma_table_list where name = ?"
params: tuple[str, ...] = (table,)
if schema is not None:
query += " and schema = ?"
params = (table, schema)
row = conn.execute(query, params).fetchone()
if row is not None and row[0] in {"table", "view", "virtual", "shadow"}:
return row[0]
except sqlite3.DatabaseError:
pass
return _sqlite_table_type_from_schema(conn, table, schema=schema)
def sqlite_hidden_table_names(conn, *, schema: str | None = "main") -> list[str]:
schema_table = _sqlite_schema_table(schema)
try:
rows = conn.execute(
"select name, sql from {} where type = 'table'".format(schema_table)
).fetchall()
except sqlite3.DatabaseError:
return []
hidden_tables = []
content_fts_tables = []
for name, sql in rows:
if (
name in {"sqlite_stat1", "sqlite_stat2", "sqlite_stat3", "sqlite_stat4"}
or name.startswith("_")
or sqlite_table_type(conn, name, schema=schema) == "shadow"
):
hidden_tables.append(name)
elif _is_fts_content_virtual_table(sql):
content_fts_tables.append(name)
return sorted(hidden_tables) + content_fts_tables
def _sqlite_table_type_from_schema(
conn,
table: str,
*,
schema: str | None = "main",
) -> SQLiteTableType | None:
schema_table = _sqlite_schema_table(schema)
try:
row = conn.execute(
"select type, sql from {} where name = ?".format(schema_table),
(table,),
).fetchone()
except sqlite3.DatabaseError:
return None
if row is None:
return None
object_type, sql = row
if object_type == "view":
return "view"
if object_type != "table":
return None
if _virtual_table_module(sql) is not None:
return "virtual"
if _is_known_shadow_table(conn, table, schema=schema):
return "shadow"
return "table"
def _is_known_shadow_table(
conn,
table: str,
*,
schema: str | None = "main",
) -> bool:
schema_table = _sqlite_schema_table(schema)
try:
rows = conn.execute(
"select name, sql from {} where type = 'table'".format(schema_table)
).fetchall()
except sqlite3.DatabaseError:
return False
for virtual_table, sql in rows:
module = _virtual_table_module(sql)
if module is None:
continue
for suffix in _VIRTUAL_TABLE_SHADOW_SUFFIXES.get(module, ()):
if table == virtual_table + suffix:
return True
return False
def _sqlite_schema_table(schema: str | None) -> str:
if schema is None or schema == "main":
return "sqlite_master"
if schema == "temp":
return "sqlite_temp_master"
return "{}.sqlite_master".format(_quote_identifier(schema))
def _quote_identifier(value: str) -> str:
return '"{}"'.format(value.replace('"', '""'))
def _virtual_table_module(sql: str | None) -> str | None:
if not sql:
return None
match = _VIRTUAL_TABLE_MODULE_RE.search(sql)
if match is None:
return None
return match.group(1).strip("\"'[]`").lower()
def _is_fts_content_virtual_table(sql: str | None) -> bool:
return (
_virtual_table_module(sql) in {"fts3", "fts4", "fts5"}
and "content=" in sql.lower()
)

View file

@ -14,6 +14,7 @@ from datasette.events import AlterTableEvent, CreateTableEvent, InsertRowsEvent
from datasette.database import QueryInterrupted from datasette.database import QueryInterrupted
from datasette.resources import DatabaseResource, QueryResource from datasette.resources import DatabaseResource, QueryResource
from datasette.stored_queries import stored_query_to_dict from datasette.stored_queries import stored_query_to_dict
from datasette.write_sql import QueryWriteRejected
from datasette.utils import ( from datasette.utils import (
add_cors_headers, add_cors_headers,
await_me_maybe, await_me_maybe,
@ -453,9 +454,24 @@ class QueryView(View):
): ):
raise Forbidden("You do not have permission to view this query") raise Forbidden("You do not have permission to view this query")
try:
await _ensure_stored_query_execution_permissions( await _ensure_stored_query_execution_permissions(
datasette, db, stored_query, request.actor datasette, db, stored_query, request.actor
) )
except QueryWriteRejected as ex:
if request.headers.get("accept") == "application/json" or request.args.get(
"_json"
):
return Response.json(
{
"ok": False,
"message": ex.message,
"redirect": None,
},
status=403,
)
datasette.add_message(request, ex.message, datasette.ERROR)
return Response.redirect(stored_query.on_error_redirect or request.path)
# If database is immutable, return an error # If database is immutable, return an error
if not db.is_mutable: if not db.is_mutable:

View file

@ -99,9 +99,7 @@ class ExecuteWriteView(BaseView):
"parameter_names": parameter_names, "parameter_names": parameter_names,
"parameter_values": parameter_values, "parameter_values": parameter_values,
"analysis_error": analysis_error, "analysis_error": analysis_error,
"analysis_rows": [ "analysis_rows": analysis_rows,
row for row in analysis_rows if row["operation"] != "read"
],
"execution_message": execution_message, "execution_message": execution_message,
"execution_links": execution_links, "execution_links": execution_links,
"execution_ok": execution_ok, "execution_ok": execution_ok,
@ -165,13 +163,15 @@ class ExecuteWriteView(BaseView):
except QueryValidationError as ex: except QueryValidationError as ex:
if _wants_json(request, is_json, data): if _wants_json(request, is_json, data):
return _block_framing(_error([ex.message], ex.status)) return _block_framing(_error([ex.message], ex.status))
if ex.flash:
self.ds.add_message(request, ex.message, self.ds.ERROR)
return await self._render_form( return await self._render_form(
request, request,
db, db,
sql=sql or "", sql=sql or "",
parameter_values=provided_params, parameter_values=provided_params,
analysis_error=ex.message, analysis_error=None if ex.flash else ex.message,
execution_message=ex.message, execution_message=None if ex.flash else ex.message,
execution_ok=False, execution_ok=False,
status=ex.status, status=ex.status,
) )
@ -193,6 +193,9 @@ class ExecuteWriteView(BaseView):
status=400, status=400,
) )
if cursor.rowcount == -1:
message = "Query executed"
else:
message = "Query executed, {} row{} affected".format( message = "Query executed, {} row{} affected".format(
cursor.rowcount, "" if cursor.rowcount == 1 else "s" cursor.rowcount, "" if cursor.rowcount == 1 else "s"
) )

View file

@ -1,8 +1,17 @@
import json import json
import re import re
from datasette.resources import DatabaseResource, TableResource from datasette.resources import DatabaseResource
from datasette.stored_queries import StoredQuery from datasette.stored_queries import (
StoredQuery,
)
from datasette.write_sql import (
IgnoreWriteSqlOperation,
QueryWriteRejected,
RequireWriteSqlPermissions,
decision_for_write_sql_operation,
operation_is_write,
)
from datasette.utils import ( from datasette.utils import (
named_parameters as derive_named_parameters, named_parameters as derive_named_parameters,
escape_sqlite, escape_sqlite,
@ -12,6 +21,7 @@ from datasette.utils import (
InvalidSql, InvalidSql,
) )
from datasette.utils.asgi import Forbidden from datasette.utils.asgi import Forbidden
from datasette.utils.sql_analysis import Operation, SQLAnalysis
_query_name_re = re.compile(r"^[^/\.\n]+$") _query_name_re = re.compile(r"^[^/\.\n]+$")
@ -41,9 +51,11 @@ _query_write_fields = {
class QueryValidationError(Exception): class QueryValidationError(Exception):
def __init__(self, message, status=400): def __init__(self, message, status=400, *, flash=False):
self.message = message self.message = message
self.status = status self.status = status
self.flash = flash
super().__init__(message)
def _actor_id(actor): def _actor_id(actor):
@ -123,11 +135,8 @@ def _coerce_query_parameters(value, derived):
return parameters return parameters
def _analysis_is_write(analysis): def _analysis_is_write(analysis: SQLAnalysis) -> bool:
return any( return any(operation_is_write(operation) for operation in analysis.operations)
access.operation in {"insert", "update", "delete"}
for access in analysis.table_accesses
)
def _block_framing(response): def _block_framing(response):
@ -191,6 +200,8 @@ async def _analyze_user_query(datasette, db, sql, *, actor):
await datasette.ensure_query_write_permissions( await datasette.ensure_query_write_permissions(
db.name, sql, actor=actor, analysis=analysis db.name, sql, actor=actor, analysis=analysis
) )
except QueryWriteRejected as ex:
raise QueryValidationError(ex.message, status=403, flash=True) from ex
except Forbidden as ex: except Forbidden as ex:
raise QueryValidationError(str(ex), status=403) from ex raise QueryValidationError(str(ex), status=403) from ex
else: else:
@ -201,34 +212,57 @@ async def _analyze_user_query(datasette, db, sql, *, actor):
return is_write, derived, analysis return is_write, derived, analysis
def _analysis_rows(analysis): def _display_operations(analysis: SQLAnalysis) -> list[Operation]:
write_actions = { operations = []
"insert": "insert-row", for operation in analysis.operations:
"update": "update-row", if isinstance(
"delete": "delete-row", decision_for_write_sql_operation(operation), IgnoreWriteSqlOperation
} ):
return [ continue
{ operations.append(operation)
"operation": access.operation, return operations
"database": access.database,
"table": access.table,
"required_permission": write_actions.get(access.operation, ""),
"source": access.source,
}
for access in analysis.table_accesses
]
async def _analysis_rows_with_permissions(datasette, analysis, actor): def _analysis_rows(analysis: SQLAnalysis) -> list[dict[str, object]]:
rows = _analysis_rows(analysis) rows = []
for row in rows: for operation in _display_operations(analysis):
permission = row["required_permission"] decision = decision_for_write_sql_operation(operation)
if permission: required_permission = (
row["allowed"] = await datasette.allowed( ", ".join(permission.action for permission in decision.permissions)
action=permission, if isinstance(decision, RequireWriteSqlPermissions)
resource=TableResource(row["database"], row["table"]), else ""
actor=actor,
) )
rows.append(
{
"operation": operation.operation,
"database": operation.database,
"table": operation.table or operation.target,
"required_permission": required_permission,
"source": operation.source,
}
)
return rows
async def _analysis_rows_with_permissions(
datasette, analysis: SQLAnalysis, actor
) -> list[dict[str, object]]:
rows = _analysis_rows(analysis)
is_write = _analysis_is_write(analysis)
for row, operation in zip(rows, _display_operations(analysis)):
decision = decision_for_write_sql_operation(operation)
if isinstance(decision, RequireWriteSqlPermissions):
row["allowed"] = True
for permission in decision.permissions:
if not await datasette.allowed(
action=permission.action,
resource=permission.resource,
actor=actor,
):
row["allowed"] = False
break
elif is_write:
row["allowed"] = False
else: else:
row["allowed"] = None row["allowed"] = None
return rows return rows
@ -277,6 +311,8 @@ async def _prepare_execute_write(datasette, db, sql, params, actor):
await datasette.ensure_query_write_permissions( await datasette.ensure_query_write_permissions(
db.name, sql, actor=actor, analysis=analysis db.name, sql, actor=actor, analysis=analysis
) )
except QueryWriteRejected as ex:
raise QueryValidationError(ex.message, status=403, flash=True) from ex
except Forbidden as ex: except Forbidden as ex:
raise QueryValidationError(str(ex), status=403) from ex raise QueryValidationError(str(ex), status=403) from ex
return parameter_names, params, analysis return parameter_names, params, analysis
@ -326,7 +362,7 @@ async def _execute_write_analysis_data(datasette, db, sql, actor):
"ok": analysis_error is None, "ok": analysis_error is None,
"parameters": parameter_names, "parameters": parameter_names,
"analysis_error": analysis_error, "analysis_error": analysis_error,
"analysis_rows": [row for row in analysis_rows if row["operation"] != "read"], "analysis_rows": analysis_rows,
"execute_disabled": bool( "execute_disabled": bool(
(not sql) (not sql)
or analysis_error or analysis_error
@ -340,6 +376,7 @@ async def _query_create_analysis_data(datasette, db, sql, actor):
parameter_names = [] parameter_names = []
analysis_rows = [] analysis_rows = []
analysis_error = None analysis_error = None
analysis: SQLAnalysis | None = None
if has_sql: if has_sql:
try: try:
parameter_names = _derived_query_parameters(sql) parameter_names = _derived_query_parameters(sql)
@ -356,9 +393,7 @@ async def _query_create_analysis_data(datasette, db, sql, actor):
"analysis_error": analysis_error, "analysis_error": analysis_error,
"analysis_rows": analysis_rows, "analysis_rows": analysis_rows,
"has_sql": has_sql, "has_sql": has_sql,
"analysis_is_write": bool( "analysis_is_write": _analysis_is_write(analysis) if analysis else False,
analysis_rows and any(row["required_permission"] for row in analysis_rows)
),
"save_disabled": bool( "save_disabled": bool(
(not has_sql) (not has_sql)
or analysis_error or analysis_error
@ -398,15 +433,19 @@ async def _inserted_row_url(datasette, db, analysis, cursor):
if lastrowid is None: if lastrowid is None:
return None return None
direct_inserts = [ direct_inserts = [
access operation
for access in analysis.table_accesses for operation in analysis.operations
if access.operation == "insert" if operation.operation == "insert"
and access.source is None and operation.target_type == "table"
and access.database == db.name and not operation.internal
and operation.source is None
and operation.database == db.name
] ]
if len(direct_inserts) != 1: if len(direct_inserts) != 1:
return None return None
table = direct_inserts[0].table table = direct_inserts[0].table
if table is None:
return None
pks = await db.primary_keys(table) pks = await db.primary_keys(table)
use_rowid = not pks use_rowid = not pks
select = ( select = (

253
datasette/write_sql.py Normal file
View file

@ -0,0 +1,253 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
from .permissions import Resource
from .resources import DatabaseResource, TableResource
from .utils import named_parameters, sqlite3
from .utils.asgi import Forbidden
from .utils.sql_analysis import Operation, SQLAnalysis
if TYPE_CHECKING:
from .app import Datasette
class QueryWriteRejected(Exception):
def __init__(self, message: str):
self.message = message
super().__init__(message)
@dataclass(frozen=True)
class PermissionRequirement:
action: str
resource: Resource
PermissionRequirements = tuple[PermissionRequirement, ...]
class WriteSqlOperationDecision:
"""What Datasette should do with one operation in user-supplied write SQL."""
@dataclass(frozen=True)
class IgnoreWriteSqlOperation(WriteSqlOperationDecision):
reason: str
@dataclass(frozen=True)
class RequireWriteSqlPermissions(WriteSqlOperationDecision):
permissions: PermissionRequirements
@dataclass(frozen=True)
class RejectWriteSqlOperation(WriteSqlOperationDecision):
message: str
@dataclass(frozen=True)
class UnsupportedWriteSqlOperation(WriteSqlOperationDecision):
message: str
def row_mutation_requirements(database: str, table: str) -> PermissionRequirements:
resource = TableResource(database=database, table=table)
return tuple(
PermissionRequirement(action=action, resource=resource)
for action in ("insert-row", "update-row", "delete-row")
)
def decision_for_write_sql_operation(
operation: Operation,
) -> WriteSqlOperationDecision:
unsupported_message = (
f"Unsupported SQL operation: {operation.operation} {operation.target_type}"
)
if operation.internal:
return IgnoreWriteSqlOperation("internal SQLite operation")
if operation.operation == "select":
return IgnoreWriteSqlOperation("select statement")
if operation.operation == "vacuum":
return RejectWriteSqlOperation("VACUUM is not allowed in user-supplied SQL")
if operation.operation in {"insert", "update", "delete"}:
if operation.table_kind == "virtual":
return RejectWriteSqlOperation(
"Writes to virtual tables are not allowed in user-supplied SQL"
)
if operation.table_kind == "shadow":
return RejectWriteSqlOperation(
"Writes to shadow tables are not allowed in user-supplied SQL"
)
if operation.operation == "function":
return IgnoreWriteSqlOperation("SQL function")
if (
operation.operation == "read"
and operation.target_type == "table"
and operation.database is not None
and operation.table is not None
):
return RequireWriteSqlPermissions(
(
PermissionRequirement(
action="view-table",
resource=TableResource(
database=operation.database, table=operation.table
),
),
)
)
if (
operation.operation in {"insert", "update"}
and operation.target_type == "table"
and operation.database is not None
and operation.table is not None
):
return RequireWriteSqlPermissions(
row_mutation_requirements(
database=operation.database,
table=operation.table,
)
)
if (
operation.operation == "delete"
and operation.target_type == "table"
and operation.database is not None
and operation.table is not None
):
return RequireWriteSqlPermissions(
(
PermissionRequirement(
action="delete-row",
resource=TableResource(
database=operation.database, table=operation.table
),
),
)
)
if operation.operation == "create" and operation.target_type == "table":
if operation.database is None:
return UnsupportedWriteSqlOperation(unsupported_message)
return RequireWriteSqlPermissions(
(
PermissionRequirement(
action="create-table",
resource=DatabaseResource(database=operation.database),
),
)
)
if (
operation.operation == "alter"
and operation.target_type == "table"
and operation.database is not None
and operation.table is not None
):
return RequireWriteSqlPermissions(
(
PermissionRequirement(
action="alter-table",
resource=TableResource(
database=operation.database, table=operation.table
),
),
)
)
if (
operation.operation == "drop"
and operation.target_type == "table"
and operation.database is not None
and operation.table is not None
):
return RequireWriteSqlPermissions(
(
PermissionRequirement(
action="drop-table",
resource=TableResource(
database=operation.database, table=operation.table
),
),
)
)
if (
operation.operation in {"create", "drop"}
and operation.target_type == "index"
and operation.database is not None
and operation.table is not None
):
return RequireWriteSqlPermissions(
(
PermissionRequirement(
action="alter-table",
resource=TableResource(
database=operation.database, table=operation.table
),
),
)
)
return UnsupportedWriteSqlOperation(unsupported_message)
def operation_is_write(operation: Operation) -> bool:
return operation.operation in {
"insert",
"update",
"delete",
"create",
"alter",
"drop",
"begin",
"commit",
"rollback",
"savepoint",
"attach",
"detach",
"pragma",
"analyze",
"reindex",
"vacuum",
"unknown",
}
async def ensure_query_write_permissions(
datasette: Datasette,
database: str,
sql: str,
*,
actor: dict[str, object] | None = None,
params: dict[str, object] | None = None,
analysis: SQLAnalysis | None = None,
) -> SQLAnalysis:
db = datasette.get_database(database)
if analysis is None:
if params is None:
params = {name: "" for name in named_parameters(sql)}
try:
analysis = await db.analyze_sql(sql, params)
except sqlite3.DatabaseError as ex:
raise Forbidden(f"Could not analyze query: {ex}") from ex
for operation in analysis.operations:
decision = decision_for_write_sql_operation(operation)
if isinstance(decision, IgnoreWriteSqlOperation):
continue
if isinstance(decision, RejectWriteSqlOperation):
raise QueryWriteRejected(decision.message)
if isinstance(decision, UnsupportedWriteSqlOperation):
raise Forbidden(decision.message)
permissions = decision.permissions
if operation.database != database:
raise Forbidden("Writable queries may not access attached databases")
for permission in permissions:
if not await datasette.allowed(
action=permission.action,
resource=permission.resource,
actor=actor,
):
raise Forbidden(
f"Permission denied: need {permission.action} "
f"on {permission.resource}"
)
return analysis

View file

@ -1425,7 +1425,7 @@ See also :ref:`the default_allow_sql setting <setting_default_allow_sql>`.
execute-write-sql execute-write-sql
----------------- -----------------
Actor is allowed to run arbitrary writable SQL queries against a specific database, subject to table-level write permissions such as ``insert-row``, ``update-row`` and ``delete-row``. Actor is allowed to run arbitrary writable SQL queries against a specific database, subject to table-level write permissions such as ``insert-row``, ``update-row`` and ``delete-row``. SQL functions are allowed and are not separately restricted by Datasette permissions.
``resource`` - ``datasette.resources.DatabaseResource(database)`` ``resource`` - ``datasette.resources.DatabaseResource(database)``
``database`` is the name of the database (string) ``database`` is the name of the database (string)

View file

@ -24,6 +24,8 @@ Write SQL UI
- New "Write to this database" interface at ``/<database>/-/execute-write`` for running arbitrary writable SQL against mutable databases. The form extracts named parameters, analyzes the SQL, shows the table operations that will be attempted and links to a newly inserted row when a single-row insert succeeds. (:issue:`2742`) - New "Write to this database" interface at ``/<database>/-/execute-write`` for running arbitrary writable SQL against mutable databases. The form extracts named parameters, analyzes the SQL, shows the table operations that will be attempted and links to a newly inserted row when a single-row insert succeeds. (:issue:`2742`)
- Added the new :ref:`execute-write-sql <actions_execute_write_sql>` permission for running arbitrary writable SQL. Execution is also gated by table-level permissions such as :ref:`insert-row <actions_insert_row>`, :ref:`update-row <actions_update_row>` and :ref:`delete-row <actions_delete_row>`, and writes to attached databases are rejected. (:issue:`2742`) - Added the new :ref:`execute-write-sql <actions_execute_write_sql>` permission for running arbitrary writable SQL. Execution is also gated by table-level permissions such as :ref:`insert-row <actions_insert_row>`, :ref:`update-row <actions_update_row>` and :ref:`delete-row <actions_delete_row>`, and writes to attached databases are rejected. (:issue:`2742`)
- The write SQL analyzer now uses a deny-by-default model for unsupported operations. Reads from source tables require :ref:`view-table <actions_view_table>` permission, schema changes require :ref:`create-table <actions_create_table>`, :ref:`alter-table <actions_alter_table>` or :ref:`drop-table <actions_drop_table>` as appropriate, and row mutation statements require the full ``insert-row``, ``update-row`` and ``delete-row`` permission set. SQL functions are allowed and are not separately permission-gated. (:issue:`2748`)
- User-supplied write SQL now rejects ``VACUUM`` and writes to SQLite virtual tables or shadow tables. These restrictions also apply to untrusted stored write queries; trusted configured stored queries continue to skip these filters. (:issue:`2748`)
Plugin API changes Plugin API changes
~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~

View file

@ -505,6 +505,70 @@ The JSON write API
Datasette provides a write API for JSON data. This is a POST-only API that requires an authenticated API token, see :ref:`CreateTokenView`. The token will need to have the specified :ref:`authentication_permissions`. Datasette provides a write API for JSON data. This is a POST-only API that requires an authenticated API token, see :ref:`CreateTokenView`. The token will need to have the specified :ref:`authentication_permissions`.
.. _ExecuteWriteView:
Executing write SQL
~~~~~~~~~~~~~~~~~~~
Actors with the :ref:`actions_execute_write_sql` permission can execute arbitrary writable SQL against a mutable database using ``/-/execute-write``.
::
POST /<database>/-/execute-write
Content-Type: application/json
Authorization: Bearer dstok_<rest-of-token>
The request body must include a ``"sql"`` string. Named SQL parameters can be provided using the optional ``"params"`` object:
.. code-block:: json
{
"sql": "insert into dogs (name) values (:name)",
"params": {
"name": "Cleo"
}
}
The SQL must be writable. Read-only ``select`` queries should use the regular :ref:`custom SQL query API <sql>` instead.
Datasette analyzes the SQL before executing it. The actor must have ``execute-write-sql`` permission for the database, and must also have any permissions required by the operations in the SQL. For example, inserts and updates against a table require ``insert-row``, ``update-row`` and ``delete-row`` permissions for that table. Reads performed as part of the write, such as ``insert into dogs select ... from other_table``, require ``view-table`` permission on the source table. Schema changes require ``create-table``, ``alter-table`` or ``drop-table`` permissions as appropriate.
Unsupported SQL operations are rejected by default. ``VACUUM`` is not allowed in arbitrary write SQL, and writes to SQLite virtual tables or shadow tables are rejected. SQL functions are allowed and are not separately restricted by Datasette permissions.
A successful response includes a message, the SQLite ``rowcount`` and a summary of the operations that were executed:
The shape of the ``"analysis"`` block is not yet considered a stable API and may change in future Datasette releases.
.. code-block:: json
{
"ok": true,
"message": "Query executed, 1 row affected",
"rowcount": 1,
"analysis": [
{
"operation": "insert",
"database": "data",
"table": "dogs",
"required_permission": "insert-row, update-row, delete-row",
"source": null
}
]
}
If SQLite reports ``-1`` for the row count, the message will be ``"Query executed"``.
Errors use the standard Datasette error format:
.. code-block:: json
{
"ok": false,
"errors": [
"Permission denied: need execute-write-sql"
]
}
.. _TableInsertView: .. _TableInsertView:
Inserting rows Inserting rows

View file

@ -140,7 +140,7 @@ Datasette stores both configured queries and user-created queries in the ``queri
Stored queries created by users default to private. Private stored queries can only be viewed, updated or deleted by the actor that created them. Broad ``view-query``, ``update-query`` or ``delete-query`` permission grants still do not allow other actors to access another actor's private stored queries. Stored queries created by users default to private. Private stored queries can only be viewed, updated or deleted by the actor that created them. Broad ``view-query``, ``update-query`` or ``delete-query`` permission grants still do not allow other actors to access another actor's private stored queries.
Stored queries created by users are untrusted. This means they execute using the permissions of the actor who runs them, as if that actor had pasted the SQL into the regular custom SQL interface or write SQL interface. Read-only stored queries require ``execute-sql``. Writable stored queries require ``execute-write-sql`` plus the relevant table-level write permissions. Stored queries created by users are untrusted. This means they execute using the permissions of the actor who runs them, as if that actor had pasted the SQL into the regular custom SQL interface or write SQL interface. Read-only stored queries require ``execute-sql``. Writable stored queries require ``execute-write-sql`` plus the relevant table-level write permissions. SQL functions are allowed and are not separately restricted by Datasette permissions.
.. _trusted_stored_queries: .. _trusted_stored_queries:
.. _trusted_saved_queries: .. _trusted_saved_queries:

View file

@ -12,10 +12,22 @@ import pytest
import pytest_asyncio import pytest_asyncio
from datasette.app import Datasette from datasette.app import Datasette
from datasette.permissions import PermissionSQL from datasette.permissions import PermissionSQL
from datasette.resources import TableResource from datasette.resources import DatabaseResource, QueryResource, TableResource
from datasette import hookimpl from datasette import hookimpl
def test_resource_string_representations():
assert str(DatabaseResource("content")) == "content"
assert repr(DatabaseResource("content")) == (
"DatabaseResource(parent='content', child=None)"
)
assert str(TableResource("content", "dogs")) == "content/dogs"
assert repr(TableResource("content", "dogs")) == (
"TableResource(parent='content', child='dogs')"
)
assert str(QueryResource("content", "insert-a-dog")) == "content/insert-a-dog"
# Test plugin that provides permission rules # Test plugin that provides permission rules
class PermissionRulesPlugin: class PermissionRulesPlugin:
def __init__(self, rules_callback): def __init__(self, rules_callback):

View file

@ -8,7 +8,7 @@ from datasette.app import Datasette
from datasette.database import Database, Results, MultipleValues from datasette.database import Database, Results, MultipleValues
from datasette.database import DatasetteClosedError from datasette.database import DatasetteClosedError
from datasette.database import _deliver_write_result from datasette.database import _deliver_write_result
from datasette.utils.sqlite import sqlite3, sqlite_version from datasette.utils.sqlite import sqlite3
from datasette.utils import Column from datasette.utils import Column
import pytest import pytest
import time import time
@ -698,14 +698,17 @@ async def test_analyze_sql():
assert [ assert [
( (
access.operation, operation.operation,
access.database, operation.database,
access.sqlite_schema, operation.sqlite_schema,
access.table, operation.table,
access.columns, operation.columns,
access.source, operation.source,
) )
for access in analysis.table_accesses for operation in analysis.operations
if operation.target_type == "table"
and operation.operation in {"read", "insert", "update", "delete"}
and not operation.internal
] == [ ] == [
("read", "data", "main", "dogs", ("id", "name"), None), ("read", "data", "main", "dogs", ("id", "name"), None),
] ]
@ -722,14 +725,17 @@ async def test_analyze_sql_insert_select():
assert { assert {
( (
access.operation, operation.operation,
access.database, operation.database,
access.sqlite_schema, operation.sqlite_schema,
access.table, operation.table,
access.columns, operation.columns,
access.source, operation.source,
) )
for access in analysis.table_accesses for operation in analysis.operations
if operation.target_type == "table"
and operation.operation in {"read", "insert", "update", "delete"}
and not operation.internal
} == { } == {
("insert", "data", "main", "dogs", (), None), ("insert", "data", "main", "dogs", (), None),
("read", "data", "main", "cats", ("name",), None), ("read", "data", "main", "cats", ("name",), None),
@ -792,14 +798,7 @@ async def test_in_memory_databases_forbid_writes(app_client):
assert await db.table_names() == ["foo"] assert await db.table_names() == ["foo"]
def pragma_table_list_supported():
return sqlite_version()[1] >= 37
@pytest.mark.asyncio @pytest.mark.asyncio
@pytest.mark.skipif(
not pragma_table_list_supported(), reason="Requires PRAGMA table_list support"
)
async def test_hidden_tables(app_client): async def test_hidden_tables(app_client):
ds = app_client.ds ds = app_client.ds
db = ds.add_database(Database(ds, is_memory=True, is_mutable=True)) db = ds.add_database(Database(ds, is_memory=True, is_mutable=True))

File diff suppressed because it is too large Load diff

View file

@ -5,7 +5,7 @@ Tests for various datasette helper functions.
from datasette.app import Datasette from datasette.app import Datasette
from datasette import utils from datasette import utils
from datasette.utils.asgi import Request from datasette.utils.asgi import Request
from datasette.utils.sqlite import sqlite3 from datasette.utils.sqlite import sqlite3, sqlite_hidden_table_names, sqlite_table_type
import json import json
import os import os
import pathlib import pathlib
@ -226,6 +226,73 @@ def test_detect_fts_different_table_names(table):
conn.close() conn.close()
@pytest.mark.parametrize("use_fallback", (False, True))
def test_sqlite_table_type_detects_virtual_and_shadow_tables(monkeypatch, use_fallback):
if use_fallback:
monkeypatch.setattr("datasette.utils.sqlite.sqlite_version", lambda: (3, 25, 0))
conn = utils.sqlite3.connect(":memory:")
try:
conn.executescript("""
create table dogs(id integer primary key, name text);
create view dog_names as select name from dogs;
create virtual table search_index using fts5(title, body);
create virtual table boxes using rtree(id, minx, maxx, miny, maxy);
""")
assert sqlite_table_type(conn, "dogs") == "table"
assert sqlite_table_type(conn, "dog_names") == "view"
assert sqlite_table_type(conn, "search_index") == "virtual"
assert sqlite_table_type(conn, "search_index_config") == "shadow"
assert sqlite_table_type(conn, "boxes") == "virtual"
assert sqlite_table_type(conn, "boxes_node") == "shadow"
assert sqlite_table_type(conn, "missing") is None
assert sqlite_hidden_table_names(conn) == [
"boxes_node",
"boxes_parent",
"boxes_rowid",
"search_index_config",
"search_index_content",
"search_index_data",
"search_index_docsize",
"search_index_idx",
]
finally:
conn.close()
@pytest.mark.parametrize("use_fallback", (False, True))
def test_sqlite_table_type_detects_attached_database_tables(monkeypatch, use_fallback):
if use_fallback:
monkeypatch.setattr("datasette.utils.sqlite.sqlite_version", lambda: (3, 25, 0))
conn = utils.sqlite3.connect(":memory:")
try:
conn.executescript("""
attach database ':memory:' as extra;
create table extra.cats(id integer primary key, name text);
create virtual table extra.cat_search using fts5(name);
""")
assert sqlite_table_type(conn, "cats", schema="extra") == "table"
assert sqlite_table_type(conn, "cat_search", schema="extra") == "virtual"
assert sqlite_table_type(conn, "cat_search_data", schema="extra") == "shadow"
finally:
conn.close()
def test_sqlite_hidden_table_names_hides_multiline_content_fts_table():
conn = utils.sqlite3.connect(":memory:")
try:
conn.executescript("""
create table searchable(id integer primary key, body text);
create virtual table searchable_fts
using fts5(body, content='searchable', content_rowid='id');
""")
assert "searchable_fts" in sqlite_hidden_table_names(conn)
finally:
conn.close()
@pytest.mark.parametrize( @pytest.mark.parametrize(
"url,expected", "url,expected",
[ [

View file

@ -26,17 +26,20 @@ def conn():
conn.close() conn.close()
def as_tuples(analysis): def table_operation_tuples(analysis):
return [ return [
( (
access.operation, operation.operation,
access.database, operation.database,
access.sqlite_schema, operation.sqlite_schema,
access.table, operation.table,
access.columns, operation.columns,
access.source, operation.source,
) )
for access in analysis.table_accesses for operation in analysis.operations
if operation.target_type == "table"
and operation.operation in {"read", "insert", "update", "delete"}
and not operation.internal
] ]
@ -48,7 +51,7 @@ def test_analyze_select_tables(conn):
database_name="data", database_name="data",
) )
assert set(as_tuples(analysis)) == { assert set(table_operation_tuples(analysis)) == {
("read", "data", "main", "cats", ("id", "name"), None), ("read", "data", "main", "cats", ("id", "name"), None),
("read", "data", "main", "dogs", ("age", "id", "name"), None), ("read", "data", "main", "dogs", ("age", "id", "name"), None),
} }
@ -57,11 +60,278 @@ def test_analyze_select_tables(conn):
def test_analyze_uses_sqlite_schema_as_default_database(conn): def test_analyze_uses_sqlite_schema_as_default_database(conn):
analysis = analyze_sql_tables(conn, "select name from dogs") analysis = analyze_sql_tables(conn, "select name from dogs")
assert set(as_tuples(analysis)) == { assert set(table_operation_tuples(analysis)) == {
("read", "main", "main", "dogs", ("name",), None), ("read", "main", "main", "dogs", ("name",), None),
} }
def test_analyze_user_schema_table_read_is_not_internal(conn):
analysis = analyze_sql_tables(
conn,
"insert into log select sql from sqlite_master where name = 'dogs'",
database_name="data",
)
assert {
"operation": "read",
"target_type": "schema",
"database": "data",
"sqlite_schema": "main",
"table": None,
"target": "sqlite_master",
"columns": ("name", "sql"),
"source": None,
"internal": False,
} in [operation_dict(operation) for operation in analysis.operations]
def operation_dict(operation):
return {
"operation": operation.operation,
"target_type": operation.target_type,
"database": operation.database,
"sqlite_schema": operation.sqlite_schema,
"table": operation.table,
"target": operation.target,
"columns": operation.columns,
"source": operation.source,
"internal": operation.internal,
}
def test_analyze_create_table_operation():
conn = sqlite3.connect(":memory:")
try:
analysis = analyze_sql_tables(
conn,
"create table foobar (id integer primary key, name text)",
database_name="data",
)
finally:
conn.close()
assert {
"operation": "create",
"target_type": "table",
"database": "data",
"sqlite_schema": "main",
"table": "foobar",
"target": "foobar",
"columns": (),
"source": None,
"internal": False,
} in [operation_dict(operation) for operation in analysis.operations]
assert not [
operation
for operation in analysis.operations
if operation.table in {"sqlite_master", "sqlite_schema"}
and not operation.internal
]
def test_analyze_vacuum_operation():
conn = sqlite3.connect(":memory:")
try:
analysis = analyze_sql_tables(conn, "vacuum", database_name="data")
finally:
conn.close()
assert [operation_dict(operation) for operation in analysis.operations] == [
{
"operation": "vacuum",
"target_type": "database",
"database": "data",
"sqlite_schema": "main",
"table": None,
"target": "data",
"columns": (),
"source": None,
"internal": False,
}
]
def test_analyze_statement_with_no_authorizer_callbacks_is_unknown():
conn = sqlite3.connect(":memory:")
try:
analysis = analyze_sql_tables(conn, "reindex", database_name="data")
finally:
conn.close()
assert [operation_dict(operation) for operation in analysis.operations] == [
{
"operation": "unknown",
"target_type": "statement",
"database": "data",
"sqlite_schema": None,
"table": None,
"target": None,
"columns": (),
"source": None,
"internal": False,
}
]
def test_analyze_transaction_operation(conn):
analysis = analyze_sql_tables(conn, "commit", database_name="data")
assert [operation_dict(operation) for operation in analysis.operations] == [
{
"operation": "commit",
"target_type": "transaction",
"database": None,
"sqlite_schema": None,
"table": None,
"target": "COMMIT",
"columns": (),
"source": None,
"internal": False,
}
]
def test_analyze_savepoint_operation(conn):
analysis = analyze_sql_tables(conn, "savepoint s", database_name="data")
assert [operation_dict(operation) for operation in analysis.operations] == [
{
"operation": "savepoint",
"target_type": "transaction",
"database": None,
"sqlite_schema": None,
"table": None,
"target": "BEGIN s",
"columns": (),
"source": None,
"internal": False,
}
]
def test_analyze_function_operation(conn):
analysis = analyze_sql_tables(
conn,
"insert into dogs (name) values (upper(:name))",
{"name": "Cleo"},
database_name="data",
)
assert {
(
operation.operation,
operation.target_type,
operation.target,
operation.database,
operation.table,
)
for operation in analysis.operations
} == {
("insert", "table", "dogs", "data", "dogs"),
("function", "function", "upper", None, None),
("read", "table", "dogs", "data", "dogs"),
("update", "table", "cats", "data", "cats"),
("read", "table", "cats", "data", "cats"),
("insert", "table", "log", "data", "log"),
}
def test_analyze_create_virtual_table_operation():
conn = sqlite3.connect(":memory:")
try:
analysis = analyze_sql_tables(
conn,
"create virtual table docs using fts5(body)",
database_name="data",
)
finally:
conn.close()
assert {
"operation": "create",
"target_type": "virtual-table",
"database": "data",
"sqlite_schema": "main",
"table": "docs",
"target": "docs",
"columns": (),
"source": None,
"internal": False,
} in [operation_dict(operation) for operation in analysis.operations]
def test_analyze_table_kind_for_regular_virtual_and_shadow_tables():
conn = sqlite3.connect(":memory:")
try:
conn.executescript("""
create table dogs (id integer primary key, name text);
create virtual table docs using fts5(title, body, content='');
""")
regular_analysis = analyze_sql_tables(
conn,
"insert into dogs (name) values ('Cleo')",
database_name="data",
)
virtual_analysis = analyze_sql_tables(
conn,
"insert into docs(docs) values('delete-all')",
database_name="data",
)
shadow_analysis = analyze_sql_tables(
conn,
"insert into docs_config(k, v) values ('x', 1)",
database_name="data",
)
finally:
conn.close()
regular_insert = next(
operation
for operation in regular_analysis.operations
if operation.operation == "insert" and operation.table == "dogs"
)
virtual_insert = next(
operation
for operation in virtual_analysis.operations
if operation.operation == "insert" and operation.table == "docs"
)
shadow_insert = next(
operation
for operation in shadow_analysis.operations
if operation.operation == "insert" and operation.table == "docs_config"
)
assert regular_insert.table_kind == "table"
assert virtual_insert.table_kind == "virtual"
assert shadow_insert.table_kind == "shadow"
def test_analyze_create_table_as_select_function_is_not_internal():
conn = sqlite3.connect(":memory:")
try:
conn.execute("create table secret(value text)")
analysis = analyze_sql_tables(
conn,
"create table copied as select substr(value, 1, 1) from secret",
database_name="data",
)
finally:
conn.close()
assert {
"operation": "function",
"target_type": "function",
"database": None,
"sqlite_schema": None,
"table": None,
"target": "substr",
"columns": (),
"source": None,
"internal": False,
} in [operation_dict(operation) for operation in analysis.operations]
def test_analyze_insert_tables(conn): def test_analyze_insert_tables(conn):
analysis = analyze_sql_tables( analysis = analyze_sql_tables(
conn, conn,
@ -70,7 +340,7 @@ def test_analyze_insert_tables(conn):
database_name="data", database_name="data",
) )
assert set(as_tuples(analysis)) == { assert set(table_operation_tuples(analysis)) == {
("insert", "data", "main", "dogs", (), None), ("insert", "data", "main", "dogs", (), None),
("read", "data", "main", "dogs", ("id", "name"), "dogs_after_insert"), ("read", "data", "main", "dogs", ("id", "name"), "dogs_after_insert"),
("update", "data", "main", "cats", ("name",), "dogs_after_insert"), ("update", "data", "main", "cats", ("name",), "dogs_after_insert"),
@ -87,7 +357,7 @@ def test_analyze_update_tables(conn):
database_name="data", database_name="data",
) )
assert set(as_tuples(analysis)) == { assert set(table_operation_tuples(analysis)) == {
("update", "data", "main", "dogs", ("age",), None), ("update", "data", "main", "dogs", ("age",), None),
("read", "data", "main", "dogs", ("age", "name"), None), ("read", "data", "main", "dogs", ("age", "name"), None),
} }
@ -101,7 +371,7 @@ def test_analyze_delete_tables(conn):
database_name="data", database_name="data",
) )
assert set(as_tuples(analysis)) == { assert set(table_operation_tuples(analysis)) == {
("delete", "data", "main", "dogs", (), None), ("delete", "data", "main", "dogs", (), None),
("read", "data", "main", "dogs", ("name",), None), ("read", "data", "main", "dogs", ("name",), None),
} }
@ -121,7 +391,7 @@ def test_analyze_insert_select_with_cte(conn):
database_name="data", database_name="data",
) )
assert set(as_tuples(analysis)) == { assert set(table_operation_tuples(analysis)) == {
("insert", "data", "main", "cats", (), None), ("insert", "data", "main", "cats", (), None),
("read", "data", "main", "dogs", ("age", "name"), "old_dogs"), ("read", "data", "main", "dogs", ("age", "name"), "old_dogs"),
} }
@ -135,7 +405,7 @@ def test_analyze_view_with_instead_of_trigger(conn):
database_name="data", database_name="data",
) )
assert set(as_tuples(analysis)) == { assert set(table_operation_tuples(analysis)) == {
("update", "data", "main", "dog_names", ("name",), None), ("update", "data", "main", "dog_names", ("name",), None),
("read", "data", "main", "dogs", ("id", "name"), "dog_names"), ("read", "data", "main", "dogs", ("id", "name"), "dog_names"),
("read", "data", "main", "dog_names", ("id", "name"), "dog_names"), ("read", "data", "main", "dog_names", ("id", "name"), "dog_names"),
@ -163,7 +433,7 @@ def test_analyze_attached_database_tables(conn):
schema_to_database={"extra": "extra_db"}, schema_to_database={"extra": "extra_db"},
) )
assert set(as_tuples(analysis)) == { assert set(table_operation_tuples(analysis)) == {
("insert", "extra_db", "extra", "people", (), None), ("insert", "extra_db", "extra", "people", (), None),
("read", "data", "main", "dogs", ("name",), None), ("read", "data", "main", "dogs", ("name",), None),
} }

68
tests/test_write_sql.py Normal file
View file

@ -0,0 +1,68 @@
from datasette.utils.sql_analysis import Operation
from datasette.write_sql import (
IgnoreWriteSqlOperation,
RejectWriteSqlOperation,
RequireWriteSqlPermissions,
UnsupportedWriteSqlOperation,
WriteSqlOperationDecision,
decision_for_write_sql_operation,
)
def test_decision_for_write_sql_operation_ignores_internal_and_select_operations():
internal_decision = decision_for_write_sql_operation(
Operation("read", "schema", None, None, "main", internal=True)
)
select_decision = decision_for_write_sql_operation(
Operation("select", "statement", None, None, None)
)
assert isinstance(internal_decision, IgnoreWriteSqlOperation)
assert isinstance(internal_decision, WriteSqlOperationDecision)
assert isinstance(select_decision, IgnoreWriteSqlOperation)
assert isinstance(select_decision, WriteSqlOperationDecision)
def test_decision_for_write_sql_operation_requires_table_write_permissions():
decision = decision_for_write_sql_operation(
Operation("insert", "table", "data", "dogs", None)
)
assert isinstance(decision, RequireWriteSqlPermissions)
assert [permission.action for permission in decision.permissions] == [
"insert-row",
"update-row",
"delete-row",
]
assert [str(permission.resource) for permission in decision.permissions] == [
"data/dogs",
"data/dogs",
"data/dogs",
]
def test_decision_for_write_sql_operation_rejects_vacuum():
decision = decision_for_write_sql_operation(
Operation("vacuum", "statement", None, None, None)
)
assert isinstance(decision, RejectWriteSqlOperation)
assert decision.message == "VACUUM is not allowed in user-supplied SQL"
def test_decision_for_write_sql_operation_ignores_functions():
decision = decision_for_write_sql_operation(
Operation("function", "function", None, None, None, target="upper")
)
assert isinstance(decision, IgnoreWriteSqlOperation)
assert decision.reason == "SQL function"
def test_decision_for_write_sql_operation_reports_unsupported_operations():
decision = decision_for_write_sql_operation(
Operation("unknown", "unknown", None, None, None)
)
assert isinstance(decision, UnsupportedWriteSqlOperation)
assert decision.message == "Unsupported SQL operation: unknown unknown"