Compare commits

...

2 commits

Author SHA1 Message Date
Simon Willison
737ff03efb Expanded analysis of SQL operations, refs #2748 2026-05-26 22:11:35 -07:00
Simon Willison
9f66cf72c1 Removed execute write SQL from query create page 2026-05-26 21:42:50 -07:00
10 changed files with 740 additions and 127 deletions

View file

@ -58,6 +58,16 @@ class Resource(ABC):
self.child = child
self._private = None # Sentinel to track if private was set
def __str__(self) -> str:
return "/".join(
str(part) for part in (self.parent, self.child) if part is not None
)
def __repr__(self) -> str:
return "{}(parent={!r}, child={!r})".format(
self.__class__.__name__, self.parent, self.child
)
@property
def private(self) -> bool:
"""

View file

@ -2,11 +2,16 @@ from __future__ import annotations
from dataclasses import dataclass
import json
from typing import Any, Iterable
from typing import Any, Iterable, TYPE_CHECKING
from .resources import TableResource
from .resources import DatabaseResource, TableResource
from .permissions import Resource
from .utils import named_parameters, sqlite3, tilde_encode, urlsafe_components
from .utils.asgi import Forbidden
from .utils.sql_analysis import Operation, SQLAnalysis
if TYPE_CHECKING:
from .app import Datasette
UNCHANGED = object()
@ -583,20 +588,94 @@ async def list_queries(
)
async def ensure_query_write_permissions(
datasette: Any,
database: str,
sql: str,
*,
actor: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
analysis: Any = None,
) -> Any:
PermissionRequirement = tuple[str, Resource]
def permission_for_operation(operation: Operation) -> PermissionRequirement | None:
write_actions = {
"insert": "insert-row",
"update": "update-row",
"delete": "delete-row",
}
action = write_actions.get(operation.operation)
if (
action
and operation.target_type == "table"
and operation.database is not None
and operation.table is not None
):
return (
action,
TableResource(database=operation.database, table=operation.table),
)
if operation.operation == "create" and operation.target_type == "table":
if operation.database is None:
return None
return (
"create-table",
DatabaseResource(database=operation.database),
)
if (
operation.operation == "alter"
and operation.target_type == "table"
and operation.database is not None
and operation.table is not None
):
return (
"alter-table",
TableResource(database=operation.database, table=operation.table),
)
if (
operation.operation == "drop"
and operation.target_type == "table"
and operation.database is not None
and operation.table is not None
):
return (
"drop-table",
TableResource(database=operation.database, table=operation.table),
)
if (
operation.operation in {"create", "drop"}
and operation.target_type == "index"
and operation.database is not None
and operation.table is not None
):
return (
"alter-table",
TableResource(database=operation.database, table=operation.table),
)
return None
def operation_is_write(operation: Operation) -> bool:
return operation.operation in {
"insert",
"update",
"delete",
"create",
"alter",
"drop",
"begin",
"commit",
"rollback",
"attach",
"detach",
"pragma",
"analyze",
"reindex",
}
async def ensure_query_write_permissions(
datasette: Datasette,
database: str,
sql: str,
*,
actor: dict[str, object] | None = None,
params: dict[str, object] | None = None,
analysis: SQLAnalysis | None = None,
) -> SQLAnalysis:
db = datasette.get_database(database)
if analysis is None:
if params is None:
@ -606,18 +685,38 @@ async def ensure_query_write_permissions(
except sqlite3.DatabaseError as ex:
raise Forbidden(f"Could not analyze query: {ex}") from ex
for access in analysis.table_accesses:
action = write_actions.get(access.operation)
if action is None:
has_semantic_schema_operation = any(
operation.operation in {"create", "alter", "drop"}
and operation.target_type in {"table", "index", "view", "trigger"}
for operation in analysis.operations
)
for operation in analysis.operations:
if operation.internal and has_semantic_schema_operation:
continue
if access.database != database:
if has_semantic_schema_operation and operation.operation in {
"read",
"insert",
"update",
"delete",
"reindex",
}:
continue
permission = permission_for_operation(operation)
if permission is None:
if operation_is_write(operation):
raise Forbidden(
"Unsupported SQL operation: {} {}".format(
operation.operation, operation.target_type
)
)
continue
action, resource = permission
if operation.database != database:
raise Forbidden("Writable queries may not write to attached databases")
if not await datasette.allowed(
action=action,
resource=TableResource(database=access.database, table=access.table),
resource=resource,
actor=actor,
):
raise Forbidden(
f"Permission denied: need {action} on {access.database}/{access.table}"
)
raise Forbidden(f"Permission denied: need {action} on {resource}")
return analysis

View file

@ -106,9 +106,6 @@ form.sql .query-create-sql textarea#sql-editor {
.query-create-analysis-note {
margin: 0;
}
.query-create-action {
margin: 0.35rem 0 1rem;
}
.query-create-analysis {
margin-top: 0.8rem;
}
@ -171,10 +168,6 @@ form.sql .query-create-sql textarea#sql-editor {
<label><input type="checkbox" name="is_private" value="1"{% if is_private %} checked{% endif %}> Private</label>
<span class="query-create-option-note">Queries marked private can only be seen by you, their creator.</span>
</p>
{% if sql and analysis_is_write %}
<p class="query-create-action"><a href="{{ urls.database(database) }}/-/execute-write?{{ {'sql': sql}|urlencode|safe }}">Execute write SQL</a></p>
{% endif %}
<p class="query-create-submit"><input type="submit" value="Save query" data-query-create-submit{% if save_disabled %} disabled{% endif %}></p>
<div class="query-create-analysis" id="query-create-analysis-section"{% if not has_sql %} hidden{% endif %}>

View file

@ -3,22 +3,66 @@ from typing import Literal
from datasette.utils.sqlite import sqlite3
SQLOperation = Literal[
"read",
"insert",
"update",
"delete",
"create",
"alter",
"drop",
"begin",
"commit",
"rollback",
"attach",
"detach",
"pragma",
"analyze",
"reindex",
]
SQLTargetType = Literal[
"table",
"index",
"view",
"trigger",
"schema",
"transaction",
"database",
"pragma",
"unknown",
]
SQLTableOperation = Literal["read", "insert", "update", "delete"]
@dataclass(frozen=True)
class SQLTableAccess:
operation: SQLTableOperation
class Operation:
operation: SQLOperation
target_type: SQLTargetType
database: str | None
table: str
table: str | None
sqlite_schema: str | None
target: str | None = None
columns: tuple[str, ...] = ()
source: str | None = None
internal: bool = False
@dataclass(frozen=True)
class SQLAnalysis:
table_accesses: tuple[SQLTableAccess, ...]
operations: tuple[Operation, ...]
# Hashable dict key for grouping repeated authorizer callbacks while collecting columns.
@dataclass(frozen=True)
class OperationKey:
operation: SQLOperation
target_type: SQLTargetType
database: str | None
table: str | None
sqlite_schema: str | None
target: str | None
source: str | None
internal: bool
_ACTION_TO_OPERATION: dict[int, SQLTableOperation] = {
@ -28,6 +72,36 @@ _ACTION_TO_OPERATION: dict[int, SQLTableOperation] = {
sqlite3.SQLITE_DELETE: "delete",
}
# Values are (operation, target_type) pairs used to construct Operation objects.
_CREATE_ACTIONS = {
sqlite3.SQLITE_CREATE_INDEX: ("create", "index"),
sqlite3.SQLITE_CREATE_TABLE: ("create", "table"),
sqlite3.SQLITE_CREATE_TRIGGER: ("create", "trigger"),
sqlite3.SQLITE_CREATE_VIEW: ("create", "view"),
}
_DROP_ACTIONS = {
sqlite3.SQLITE_DROP_INDEX: ("drop", "index"),
sqlite3.SQLITE_DROP_TABLE: ("drop", "table"),
sqlite3.SQLITE_DROP_TRIGGER: ("drop", "trigger"),
sqlite3.SQLITE_DROP_VIEW: ("drop", "view"),
}
for action_name, operation, target_type in (
("SQLITE_CREATE_TEMP_INDEX", "create", "index"),
("SQLITE_CREATE_TEMP_TABLE", "create", "table"),
("SQLITE_CREATE_TEMP_TRIGGER", "create", "trigger"),
("SQLITE_CREATE_TEMP_VIEW", "create", "view"),
("SQLITE_DROP_TEMP_INDEX", "drop", "index"),
("SQLITE_DROP_TEMP_TABLE", "drop", "table"),
("SQLITE_DROP_TEMP_TRIGGER", "drop", "trigger"),
("SQLITE_DROP_TEMP_VIEW", "drop", "view"),
):
action_value = getattr(sqlite3, action_name, None)
if action_value is not None:
actions = _CREATE_ACTIONS if operation == "create" else _DROP_ACTIONS
actions[action_value] = (operation, target_type)
_SQLITE_SCHEMA_TABLES = {"sqlite_master", "sqlite_schema"}
def analyze_sql_tables(
conn,
@ -38,15 +112,13 @@ def analyze_sql_tables(
schema_to_database: dict[str, str] | None = None,
) -> SQLAnalysis:
"""
Return tables accessed by a SQL statement according to SQLite's authorizer.
Return operations performed by a SQL statement according to SQLite's authorizer.
This function is synchronous and connection-based. It temporarily installs a
SQLite authorizer, prepares ``EXPLAIN <sql>``, and returns the table access
SQLite authorizer, prepares ``EXPLAIN <sql>``, and returns the operation
callbacks observed while SQLite compiles the statement.
"""
accesses: dict[
tuple[SQLTableOperation, str | None, str, str | None, str | None], set[str]
] = {}
operations: dict[OperationKey, set[str]] = {}
def database_for_schema(sqlite_schema):
if schema_to_database and sqlite_schema in schema_to_database:
@ -55,21 +127,166 @@ def analyze_sql_tables(
return database_name
return sqlite_schema
def record(
operation: SQLOperation,
target_type: SQLTargetType,
*,
database: str | None,
table: str | None,
sqlite_schema: str | None,
target: str | None,
source: str | None,
column: str | None = None,
internal: bool = False,
):
key = OperationKey(
operation=operation,
target_type=target_type,
database=database,
table=table,
sqlite_schema=sqlite_schema,
target=target,
source=source,
internal=internal,
)
columns = operations.setdefault(key, set())
if column is not None:
columns.add(column)
def authorizer(action, arg1, arg2, sqlite_schema, source):
operation = _ACTION_TO_OPERATION.get(action)
if operation is None or arg1 is None:
if operation is not None and arg1 is not None:
target_type = "schema" if arg1 in _SQLITE_SCHEMA_TABLES else "table"
column = (
arg2 if operation in ("read", "update") and arg2 is not None else None
)
record(
operation,
target_type,
database=database_for_schema(sqlite_schema),
table=arg1 if target_type == "table" else None,
sqlite_schema=sqlite_schema,
target=arg1,
source=source,
column=column,
internal=target_type == "schema",
)
return sqlite3.SQLITE_OK
create_operation = _CREATE_ACTIONS.get(action)
if create_operation is not None and arg1 is not None:
operation, target_type = create_operation
related_table = arg2 if target_type in {"index", "trigger"} else arg1
record(
operation,
target_type,
database=database_for_schema(sqlite_schema),
table=related_table,
sqlite_schema=sqlite_schema,
target=arg1,
source=source,
)
return sqlite3.SQLITE_OK
drop_operation = _DROP_ACTIONS.get(action)
if drop_operation is not None and arg1 is not None:
operation, target_type = drop_operation
related_table = arg2 if target_type in {"index", "trigger"} else arg1
record(
operation,
target_type,
database=database_for_schema(sqlite_schema),
table=related_table,
sqlite_schema=sqlite_schema,
target=arg1,
source=source,
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_ALTER_TABLE and arg2 is not None:
record(
"alter",
"table",
database=database_for_schema(arg1),
table=arg2,
sqlite_schema=arg1,
target=arg2,
source=source,
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_TRANSACTION and arg1 is not None:
record(
arg1.lower(),
"transaction",
database=None,
table=None,
sqlite_schema=None,
target=arg1,
source=source,
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_ATTACH and arg1 is not None:
record(
"attach",
"database",
database=None,
table=None,
sqlite_schema=None,
target=arg1,
source=source,
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_DETACH and arg1 is not None:
record(
"detach",
"database",
database=None,
table=None,
sqlite_schema=None,
target=arg1,
source=source,
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_PRAGMA and arg1 is not None:
record(
"pragma",
"pragma",
database=None,
table=None,
sqlite_schema=sqlite_schema,
target=arg1,
source=source,
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_ANALYZE:
record(
"analyze",
"database" if arg1 is None else "table",
database=database_for_schema(sqlite_schema),
table=arg1,
sqlite_schema=sqlite_schema,
target=arg1,
source=source,
)
return sqlite3.SQLITE_OK
if action == sqlite3.SQLITE_REINDEX and arg1 is not None:
record(
"reindex",
"index",
database=database_for_schema(sqlite_schema),
table=None,
sqlite_schema=sqlite_schema,
target=arg1,
source=source,
)
return sqlite3.SQLITE_OK
key = (
operation,
database_for_schema(sqlite_schema),
arg1,
sqlite_schema,
source,
)
columns = accesses.setdefault(key, set())
if operation in ("read", "update") and arg2 is not None:
columns.add(arg2)
return sqlite3.SQLITE_OK
conn.set_authorizer(authorizer)
@ -78,22 +295,26 @@ def analyze_sql_tables(
finally:
conn.set_authorizer(None)
has_schema_operation = any(
key.target_type in {"table", "index", "view", "trigger"}
and key.operation in {"create", "alter", "drop"}
for key in operations
)
return SQLAnalysis(
table_accesses=tuple(
SQLTableAccess(
operation=operation,
database=database,
table=table,
sqlite_schema=sqlite_schema,
operations=tuple(
Operation(
operation=key.operation,
target_type=key.target_type,
database=key.database,
table=key.table,
sqlite_schema=key.sqlite_schema,
target=key.target,
columns=tuple(sorted(columns)),
source=source,
source=key.source,
internal=key.internal
or (has_schema_operation and key.target_type == "schema"),
)
for (
operation,
database,
table,
sqlite_schema,
source,
), columns in accesses.items()
for key, columns in operations.items()
)
)

View file

@ -193,9 +193,12 @@ class ExecuteWriteView(BaseView):
status=400,
)
message = "Query executed, {} row{} affected".format(
cursor.rowcount, "" if cursor.rowcount == 1 else "s"
)
if cursor.rowcount == -1:
message = "Query executed"
else:
message = "Query executed, {} row{} affected".format(
cursor.rowcount, "" if cursor.rowcount == 1 else "s"
)
if _wants_json(request, is_json, data):
return _block_framing(
Response.json(

View file

@ -1,8 +1,12 @@
import json
import re
from datasette.resources import DatabaseResource, TableResource
from datasette.stored_queries import StoredQuery
from datasette.resources import DatabaseResource
from datasette.stored_queries import (
StoredQuery,
operation_is_write,
permission_for_operation,
)
from datasette.utils import (
named_parameters as derive_named_parameters,
escape_sqlite,
@ -12,6 +16,7 @@ from datasette.utils import (
InvalidSql,
)
from datasette.utils.asgi import Forbidden
from datasette.utils.sql_analysis import Operation, SQLAnalysis
_query_name_re = re.compile(r"^[^/\.\n]+$")
@ -123,11 +128,8 @@ def _coerce_query_parameters(value, derived):
return parameters
def _analysis_is_write(analysis):
return any(
access.operation in {"insert", "update", "delete"}
for access in analysis.table_accesses
)
def _analysis_is_write(analysis: SQLAnalysis) -> bool:
return any(operation_is_write(operation) for operation in analysis.operations)
def _block_framing(response):
@ -201,34 +203,66 @@ async def _analyze_user_query(datasette, db, sql, *, actor):
return is_write, derived, analysis
def _analysis_rows(analysis):
write_actions = {
"insert": "insert-row",
"update": "update-row",
"delete": "delete-row",
}
return [
{
"operation": access.operation,
"database": access.database,
"table": access.table,
"required_permission": write_actions.get(access.operation, ""),
"source": access.source,
}
for access in analysis.table_accesses
]
def _semantic_schema_operation_is_present(operations: tuple[Operation, ...]) -> bool:
return any(
operation.operation in {"create", "alter", "drop"}
and operation.target_type in {"table", "index", "view", "trigger"}
for operation in operations
)
async def _analysis_rows_with_permissions(datasette, analysis, actor):
def _display_operations(analysis: SQLAnalysis) -> list[Operation]:
has_semantic_schema_operation = _semantic_schema_operation_is_present(
analysis.operations
)
operations = []
for operation in analysis.operations:
if operation.internal and has_semantic_schema_operation:
continue
if has_semantic_schema_operation and operation.operation in {
"read",
"insert",
"update",
"delete",
"reindex",
}:
continue
operations.append(operation)
return operations
def _analysis_rows(analysis: SQLAnalysis) -> list[dict[str, object]]:
rows = []
for operation in _display_operations(analysis):
permission = permission_for_operation(operation)
required_permission = permission[0] if permission else ""
rows.append(
{
"operation": operation.operation,
"database": operation.database,
"table": operation.table or operation.target,
"required_permission": required_permission,
"source": operation.source,
}
)
return rows
async def _analysis_rows_with_permissions(
datasette, analysis: SQLAnalysis, actor
) -> list[dict[str, object]]:
rows = _analysis_rows(analysis)
for row in rows:
permission = row["required_permission"]
for row, operation in zip(rows, _display_operations(analysis)):
permission = permission_for_operation(operation)
if permission:
action, resource = permission
row["allowed"] = await datasette.allowed(
action=permission,
resource=TableResource(row["database"], row["table"]),
action=action,
resource=resource,
actor=actor,
)
elif operation_is_write(operation):
row["allowed"] = False
else:
row["allowed"] = None
return rows
@ -398,15 +432,19 @@ async def _inserted_row_url(datasette, db, analysis, cursor):
if lastrowid is None:
return None
direct_inserts = [
access
for access in analysis.table_accesses
if access.operation == "insert"
and access.source is None
and access.database == db.name
operation
for operation in analysis.operations
if operation.operation == "insert"
and operation.target_type == "table"
and not operation.internal
and operation.source is None
and operation.database == db.name
]
if len(direct_inserts) != 1:
return None
table = direct_inserts[0].table
if table is None:
return None
pks = await db.primary_keys(table)
use_rowid = not pks
select = (

View file

@ -12,10 +12,22 @@ import pytest
import pytest_asyncio
from datasette.app import Datasette
from datasette.permissions import PermissionSQL
from datasette.resources import TableResource
from datasette.resources import DatabaseResource, QueryResource, TableResource
from datasette import hookimpl
def test_resource_string_representations():
assert str(DatabaseResource("content")) == "content"
assert repr(DatabaseResource("content")) == (
"DatabaseResource(parent='content', child=None)"
)
assert str(TableResource("content", "dogs")) == "content/dogs"
assert repr(TableResource("content", "dogs")) == (
"TableResource(parent='content', child='dogs')"
)
assert str(QueryResource("content", "insert-a-dog")) == "content/insert-a-dog"
# Test plugin that provides permission rules
class PermissionRulesPlugin:
def __init__(self, rules_callback):

View file

@ -698,14 +698,17 @@ async def test_analyze_sql():
assert [
(
access.operation,
access.database,
access.sqlite_schema,
access.table,
access.columns,
access.source,
operation.operation,
operation.database,
operation.sqlite_schema,
operation.table,
operation.columns,
operation.source,
)
for access in analysis.table_accesses
for operation in analysis.operations
if operation.target_type == "table"
and operation.operation in {"read", "insert", "update", "delete"}
and not operation.internal
] == [
("read", "data", "main", "dogs", ("id", "name"), None),
]
@ -722,14 +725,17 @@ async def test_analyze_sql_insert_select():
assert {
(
access.operation,
access.database,
access.sqlite_schema,
access.table,
access.columns,
access.source,
operation.operation,
operation.database,
operation.sqlite_schema,
operation.table,
operation.columns,
operation.source,
)
for access in analysis.table_accesses
for operation in analysis.operations
if operation.target_type == "table"
and operation.operation in {"read", "insert", "update", "delete"}
and not operation.internal
} == {
("insert", "data", "main", "dogs", (), None),
("read", "data", "main", "cats", ("name",), None),

View file

@ -1643,6 +1643,172 @@ async def test_execute_write_post_requires_database_and_table_permissions():
assert (await db.execute("select name from dogs")).first()[0] == "Cleo"
@pytest.mark.asyncio
async def test_execute_write_create_table_uses_create_table_permission():
ds = Datasette(
memory=True,
default_deny=True,
config={
"permissions": {
"insert-row": {"id": "row-writer"},
"update-row": {"id": "row-writer"},
},
"databases": {
"data": {
"permissions": {
"view-database": {"id": ["creator", "row-writer"]},
"execute-write-sql": {"id": ["creator", "row-writer"]},
"create-table": {"id": "creator"},
}
}
},
},
)
db = ds.add_memory_database("execute_write_create_table", name="data")
await ds.invoke_startup()
analysis_response = await ds.client.get(
"/data/-/execute-write/analyze",
actor={"id": "creator"},
params={"sql": "create table foobar (id integer primary key, name text)"},
)
allowed_response = await ds.client.post(
"/data/-/execute-write",
actor={"id": "creator"},
json={"sql": "create table foobar (id integer primary key, name text)"},
)
row_permission_response = await ds.client.post(
"/data/-/execute-write",
actor={"id": "row-writer"},
json={"sql": "create table should_not_exist (id integer primary key)"},
)
assert analysis_response.status_code == 200
analysis_data = analysis_response.json()
assert analysis_data["ok"] is True
assert analysis_data["execute_disabled"] is False
assert analysis_data["analysis_rows"] == [
{
"operation": "create",
"database": "data",
"table": "foobar",
"required_permission": "create-table",
"source": None,
"allowed": True,
}
]
assert allowed_response.status_code == 200
assert allowed_response.json()["ok"] is True
assert allowed_response.json()["message"] == "Query executed"
assert await db.table_exists("foobar")
assert row_permission_response.status_code == 403
assert row_permission_response.json()["errors"] == [
"Permission denied: need create-table on data"
]
assert not await db.table_exists("should_not_exist")
@pytest.mark.asyncio
async def test_execute_write_alter_and_drop_table_use_schema_permissions():
ds = Datasette(
memory=True,
default_deny=True,
config={
"permissions": {
"delete-row": {"id": "row-writer"},
"update-row": {"id": "row-writer"},
},
"databases": {
"data": {
"permissions": {
"view-database": {"id": ["alterer", "dropper", "row-writer"]},
"execute-write-sql": {
"id": ["alterer", "dropper", "row-writer"]
},
},
"tables": {
"dogs": {
"permissions": {
"alter-table": {"id": "alterer"},
"drop-table": {"id": "dropper"},
}
}
},
}
},
},
)
db = ds.add_memory_database("execute_write_alter_drop_table", name="data")
await db.execute_write("create table dogs (id integer primary key, name text)")
await db.execute_write("create table cats (id integer primary key, name text)")
await ds.invoke_startup()
alter_allowed_response = await ds.client.post(
"/data/-/execute-write",
actor={"id": "alterer"},
json={"sql": "alter table dogs add column age integer"},
)
alter_row_permission_response = await ds.client.post(
"/data/-/execute-write",
actor={"id": "row-writer"},
json={"sql": "alter table cats add column age integer"},
)
assert alter_allowed_response.status_code == 200
assert "age" in [column.name for column in await db.table_column_details("dogs")]
assert alter_row_permission_response.status_code == 403
assert alter_row_permission_response.json()["errors"] == [
"Permission denied: need alter-table on data/cats"
]
assert "age" not in [
column.name for column in await db.table_column_details("cats")
]
create_index_allowed_response = await ds.client.post(
"/data/-/execute-write",
actor={"id": "alterer"},
json={"sql": "create index idx_dogs_name on dogs(name)"},
)
create_index_row_permission_response = await ds.client.post(
"/data/-/execute-write",
actor={"id": "row-writer"},
json={"sql": "create index idx_cats_name on cats(name)"},
)
drop_index_allowed_response = await ds.client.post(
"/data/-/execute-write",
actor={"id": "alterer"},
json={"sql": "drop index idx_dogs_name"},
)
assert create_index_allowed_response.status_code == 200
assert create_index_row_permission_response.status_code == 403
assert create_index_row_permission_response.json()["errors"] == [
"Permission denied: need alter-table on data/cats"
]
assert drop_index_allowed_response.status_code == 200
drop_allowed_response = await ds.client.post(
"/data/-/execute-write",
actor={"id": "dropper"},
json={"sql": "drop table dogs"},
)
drop_row_permission_response = await ds.client.post(
"/data/-/execute-write",
actor={"id": "row-writer"},
json={"sql": "drop table cats"},
)
assert drop_allowed_response.status_code == 200
assert not await db.table_exists("dogs")
assert drop_row_permission_response.status_code == 403
assert drop_row_permission_response.json()["errors"] == [
"Permission denied: need drop-table on data/cats"
]
assert await db.table_exists("cats")
@pytest.mark.asyncio
async def test_execute_write_insert_links_to_inserted_row():
ds = Datasette(memory=True, default_deny=True)

View file

@ -26,17 +26,20 @@ def conn():
conn.close()
def as_tuples(analysis):
def table_operation_tuples(analysis):
return [
(
access.operation,
access.database,
access.sqlite_schema,
access.table,
access.columns,
access.source,
operation.operation,
operation.database,
operation.sqlite_schema,
operation.table,
operation.columns,
operation.source,
)
for access in analysis.table_accesses
for operation in analysis.operations
if operation.target_type == "table"
and operation.operation in {"read", "insert", "update", "delete"}
and not operation.internal
]
@ -48,7 +51,7 @@ def test_analyze_select_tables(conn):
database_name="data",
)
assert set(as_tuples(analysis)) == {
assert set(table_operation_tuples(analysis)) == {
("read", "data", "main", "cats", ("id", "name"), None),
("read", "data", "main", "dogs", ("age", "id", "name"), None),
}
@ -57,11 +60,73 @@ def test_analyze_select_tables(conn):
def test_analyze_uses_sqlite_schema_as_default_database(conn):
analysis = analyze_sql_tables(conn, "select name from dogs")
assert set(as_tuples(analysis)) == {
assert set(table_operation_tuples(analysis)) == {
("read", "main", "main", "dogs", ("name",), None),
}
def operation_dict(operation):
return {
"operation": operation.operation,
"target_type": operation.target_type,
"database": operation.database,
"sqlite_schema": operation.sqlite_schema,
"table": operation.table,
"target": operation.target,
"columns": operation.columns,
"source": operation.source,
"internal": operation.internal,
}
def test_analyze_create_table_operation():
conn = sqlite3.connect(":memory:")
try:
analysis = analyze_sql_tables(
conn,
"create table foobar (id integer primary key, name text)",
database_name="data",
)
finally:
conn.close()
assert {
"operation": "create",
"target_type": "table",
"database": "data",
"sqlite_schema": "main",
"table": "foobar",
"target": "foobar",
"columns": (),
"source": None,
"internal": False,
} in [operation_dict(operation) for operation in analysis.operations]
assert not [
operation
for operation in analysis.operations
if operation.table in {"sqlite_master", "sqlite_schema"}
and not operation.internal
]
def test_analyze_transaction_operation(conn):
analysis = analyze_sql_tables(conn, "commit", database_name="data")
assert [operation_dict(operation) for operation in analysis.operations] == [
{
"operation": "commit",
"target_type": "transaction",
"database": None,
"sqlite_schema": None,
"table": None,
"target": "COMMIT",
"columns": (),
"source": None,
"internal": False,
}
]
def test_analyze_insert_tables(conn):
analysis = analyze_sql_tables(
conn,
@ -70,7 +135,7 @@ def test_analyze_insert_tables(conn):
database_name="data",
)
assert set(as_tuples(analysis)) == {
assert set(table_operation_tuples(analysis)) == {
("insert", "data", "main", "dogs", (), None),
("read", "data", "main", "dogs", ("id", "name"), "dogs_after_insert"),
("update", "data", "main", "cats", ("name",), "dogs_after_insert"),
@ -87,7 +152,7 @@ def test_analyze_update_tables(conn):
database_name="data",
)
assert set(as_tuples(analysis)) == {
assert set(table_operation_tuples(analysis)) == {
("update", "data", "main", "dogs", ("age",), None),
("read", "data", "main", "dogs", ("age", "name"), None),
}
@ -101,7 +166,7 @@ def test_analyze_delete_tables(conn):
database_name="data",
)
assert set(as_tuples(analysis)) == {
assert set(table_operation_tuples(analysis)) == {
("delete", "data", "main", "dogs", (), None),
("read", "data", "main", "dogs", ("name",), None),
}
@ -121,7 +186,7 @@ def test_analyze_insert_select_with_cte(conn):
database_name="data",
)
assert set(as_tuples(analysis)) == {
assert set(table_operation_tuples(analysis)) == {
("insert", "data", "main", "cats", (), None),
("read", "data", "main", "dogs", ("age", "name"), "old_dogs"),
}
@ -135,7 +200,7 @@ def test_analyze_view_with_instead_of_trigger(conn):
database_name="data",
)
assert set(as_tuples(analysis)) == {
assert set(table_operation_tuples(analysis)) == {
("update", "data", "main", "dog_names", ("name",), None),
("read", "data", "main", "dogs", ("id", "name"), "dog_names"),
("read", "data", "main", "dog_names", ("id", "name"), "dog_names"),
@ -163,7 +228,7 @@ def test_analyze_attached_database_tables(conn):
schema_to_database={"extra": "extra_db"},
)
assert set(as_tuples(analysis)) == {
assert set(table_operation_tuples(analysis)) == {
("insert", "extra_db", "extra", "people", (), None),
("read", "data", "main", "dogs", ("name",), None),
}