Detect and disallow insert to virtual/shadow table

Refs https://github.com/simonw/datasette/pull/2749#issuecomment-4565727978
This commit is contained in:
Simon Willison 2026-05-28 08:36:59 -07:00
commit bcd989f4f8
6 changed files with 381 additions and 2 deletions

View file

@ -713,6 +713,11 @@ def operation_should_be_ignored(operation: Operation) -> bool:
def operation_forbidden_message(operation: Operation) -> str | None:
if operation.operation == "vacuum":
return "VACUUM is not allowed in user-supplied SQL"
if operation.operation in {"insert", "update", "delete"}:
if operation.table_kind == "virtual":
return "Writes to virtual tables are not allowed in user-supplied SQL"
if operation.table_kind == "shadow":
return "Writes to shadow tables are not allowed in user-supplied SQL"
return None

View file

@ -1,7 +1,7 @@
from dataclasses import dataclass
from typing import Literal
from datasette.utils.sqlite import sqlite3
from datasette.utils.sqlite import SQLiteTableType, sqlite3, sqlite_table_type
SQLOperation = Literal[
"read",
@ -42,6 +42,7 @@ SQLTargetType = Literal[
SQLTableOperation = Literal["read", "insert", "update", "delete"]
SQLSchemaOperation = Literal["create", "drop"]
SQLSchemaTargetType = Literal["index", "table", "trigger", "view", "virtual-table"]
SQLTableKind = SQLiteTableType
@dataclass(frozen=True)
@ -51,6 +52,7 @@ class Operation:
database: str | None
table: str | None
sqlite_schema: str | None
table_kind: SQLTableKind | None = None
target: str | None = None
columns: tuple[str, ...] = ()
source: str | None = None
@ -500,6 +502,22 @@ def analyze_sql_tables(
return True
return False
table_kind_cache: dict[tuple[str | None, str], SQLTableKind | None] = {}
def table_kind_for(key: OperationKey) -> SQLTableKind | None:
if (
key.target_type != "table"
or key.operation not in {"read", "insert", "update", "delete"}
or key.table is None
):
return None
cache_key = (key.sqlite_schema, key.table)
if cache_key not in table_kind_cache:
table_kind_cache[cache_key] = sqlite_table_type(
conn, key.table, schema=key.sqlite_schema
)
return table_kind_cache[cache_key]
return SQLAnalysis(
operations=tuple(
Operation(
@ -508,6 +526,7 @@ def analyze_sql_tables(
database=key.database,
table=key.table,
sqlite_schema=key.sqlite_schema,
table_kind=table_kind_for(key),
target=key.target,
columns=tuple(sorted(columns)),
source=key.source,

View file

@ -1,3 +1,6 @@
import re
from typing import Literal
using_pysqlite3 = False
try:
import pysqlite3 as sqlite3
@ -10,6 +13,18 @@ if hasattr(sqlite3, "enable_callback_tracebacks"):
sqlite3.enable_callback_tracebacks(True)
_cached_sqlite_version = None
SQLiteTableType = Literal["table", "view", "virtual", "shadow"]
_VIRTUAL_TABLE_MODULE_RE = re.compile(
r"\bCREATE\s+VIRTUAL\s+TABLE\b.*?\bUSING\s+([^\s(]+)",
re.IGNORECASE,
)
_VIRTUAL_TABLE_SHADOW_SUFFIXES = {
"fts3": ("_content", "_segdir", "_segments", "_stat", "_docsize"),
"fts4": ("_content", "_segdir", "_segments", "_stat", "_docsize"),
"fts5": ("_data", "_idx", "_docsize", "_content", "_config"),
"rtree": ("_node", "_parent", "_rowid"),
"rtree_i32": ("_node", "_parent", "_rowid"),
}
def sqlite_version():
@ -36,5 +51,102 @@ def supports_table_xinfo():
return sqlite_version() >= (3, 26, 0)
def supports_table_list():
return sqlite_version() >= (3, 37, 0)
def supports_generated_columns():
return sqlite_version() >= (3, 31, 0)
def sqlite_table_type(
conn,
table: str,
*,
schema: str | None = "main",
) -> SQLiteTableType | None:
if supports_table_list():
try:
query = "select type from pragma_table_list where name = ?"
params: tuple[str, ...] = (table,)
if schema is not None:
query += " and schema = ?"
params = (table, schema)
row = conn.execute(query, params).fetchone()
if row is not None and row[0] in {"table", "view", "virtual", "shadow"}:
return row[0]
except sqlite3.DatabaseError:
pass
return _sqlite_table_type_from_schema(conn, table, schema=schema)
def _sqlite_table_type_from_schema(
conn,
table: str,
*,
schema: str | None = "main",
) -> SQLiteTableType | None:
schema_table = _sqlite_schema_table(schema)
try:
row = conn.execute(
"select type, sql from {} where name = ?".format(schema_table),
(table,),
).fetchone()
except sqlite3.DatabaseError:
return None
if row is None:
return None
object_type, sql = row
if object_type == "view":
return "view"
if object_type != "table":
return None
if _virtual_table_module(sql) is not None:
return "virtual"
if _is_known_shadow_table(conn, table, schema=schema):
return "shadow"
return "table"
def _is_known_shadow_table(
conn,
table: str,
*,
schema: str | None = "main",
) -> bool:
schema_table = _sqlite_schema_table(schema)
try:
rows = conn.execute(
"select name, sql from {} where type = 'table'".format(schema_table)
).fetchall()
except sqlite3.DatabaseError:
return False
for virtual_table, sql in rows:
module = _virtual_table_module(sql)
if module is None:
continue
for suffix in _VIRTUAL_TABLE_SHADOW_SUFFIXES.get(module, ()):
if table == virtual_table + suffix:
return True
return False
def _sqlite_schema_table(schema: str | None) -> str:
if schema is None or schema == "main":
return "sqlite_master"
if schema == "temp":
return "sqlite_temp_master"
return "{}.sqlite_master".format(_quote_identifier(schema))
def _quote_identifier(value: str) -> str:
return '"{}"'.format(value.replace('"', '""'))
def _virtual_table_module(sql: str | None) -> str | None:
if not sql:
return None
match = _VIRTUAL_TABLE_MODULE_RE.search(sql)
if match is None:
return None
return match.group(1).strip("\"'[]`").lower()

View file

@ -2193,6 +2193,159 @@ async def test_trusted_stored_write_query_skips_vacuum_filtering():
assert response.json()["ok"] is True
@pytest.mark.asyncio
async def test_execute_write_rejects_virtual_table_control_insert():
ds = Datasette(memory=True, default_deny=True)
ds.root_enabled = True
db = ds.add_memory_database("execute_write_virtual_table_control", name="data")
await db.execute_write("""
create virtual table docs using fts5(title, body, content='')
""")
await db.execute_write("""
insert into docs(rowid, title, body) values (1, 'hello', 'world')
""")
await ds.invoke_startup()
denied_response = await ds.client.post(
"/data/-/execute-write",
actor={"id": "root"},
json={"sql": "insert into docs(docs) values('delete-all')"},
)
assert denied_response.status_code == 403
assert denied_response.json()["errors"] == [
"Writes to virtual tables are not allowed in user-supplied SQL"
]
assert (
await db.execute("select count(*) from docs where docs match 'hello'")
).first()[0] == 1
@pytest.mark.asyncio
async def test_execute_write_rejects_regular_virtual_table_insert():
ds = Datasette(memory=True, default_deny=True)
ds.root_enabled = True
db = ds.add_memory_database("execute_write_virtual_table_insert", name="data")
await db.execute_write("create virtual table docs using fts5(title, body)")
await ds.invoke_startup()
denied_response = await ds.client.post(
"/data/-/execute-write",
actor={"id": "root"},
json={"sql": "insert into docs(rowid, title, body) values (1, 'a', 'b')"},
)
assert denied_response.status_code == 403
assert denied_response.json()["errors"] == [
"Writes to virtual tables are not allowed in user-supplied SQL"
]
assert (await db.execute("select count(*) from docs")).first()[0] == 0
@pytest.mark.asyncio
async def test_execute_write_rejects_shadow_table_insert():
ds = Datasette(memory=True, default_deny=True)
ds.root_enabled = True
db = ds.add_memory_database("execute_write_shadow_table_insert", name="data")
await db.execute_write("create virtual table docs using fts5(title, body)")
await ds.invoke_startup()
denied_response = await ds.client.post(
"/data/-/execute-write",
actor={"id": "root"},
json={"sql": "insert into docs_config(k, v) values ('x', 1)"},
)
assert denied_response.status_code == 403
assert denied_response.json()["errors"] == [
"Writes to shadow tables are not allowed in user-supplied SQL"
]
assert (await db.execute("select count(*) from docs_config")).first()[0] == 1
@pytest.mark.asyncio
async def test_untrusted_stored_write_query_rejects_virtual_table_control_insert():
ds = Datasette(memory=True, default_deny=True)
ds.root_enabled = True
db = ds.add_memory_database("stored_query_virtual_table_control", name="data")
await db.execute_write("""
create virtual table docs using fts5(title, body, content='')
""")
await db.execute_write("""
insert into docs(rowid, title, body) values (1, 'hello', 'world')
""")
await ds.invoke_startup()
await ds.add_query(
"data",
"delete_all_docs",
"insert into docs(docs) values('delete-all')",
is_write=True,
is_trusted=False,
source="user",
owner_id="root",
)
denied_response = await ds.client.post(
"/data/delete_all_docs?_json=1",
actor={"id": "root"},
data={},
)
assert denied_response.status_code == 403
assert denied_response.json()["message"] == (
"Writes to virtual tables are not allowed in user-supplied SQL"
)
assert (
await db.execute("select count(*) from docs where docs match 'hello'")
).first()[0] == 1
@pytest.mark.asyncio
async def test_trusted_stored_write_query_can_write_virtual_table():
ds = Datasette(
memory=True,
default_deny=True,
config={
"databases": {
"data": {
"permissions": {
"view-database": {"id": "writer"},
"view-query": {"id": "writer"},
}
}
}
},
)
db = ds.add_memory_database("trusted_stored_query_virtual_table", name="data")
await db.execute_write("""
create virtual table docs using fts5(title, body, content='')
""")
await db.execute_write("""
insert into docs(rowid, title, body) values (1, 'hello', 'world')
""")
await ds.invoke_startup()
await ds.add_query(
"data",
"trusted_delete_all",
"insert into docs(docs) values('delete-all')",
is_write=True,
is_trusted=True,
source="config",
)
response = await ds.client.post(
"/data/trusted_delete_all?_json=1",
actor={"id": "writer"},
data={},
)
assert response.status_code == 200
assert response.json()["ok"] is True
assert (
await db.execute("select count(*) from docs where docs match 'hello'")
).first()[0] == 0
@pytest.mark.asyncio
async def test_execute_write_create_table_uses_create_table_permission():
ds = Datasette(

View file

@ -5,7 +5,7 @@ Tests for various datasette helper functions.
from datasette.app import Datasette
from datasette import utils
from datasette.utils.asgi import Request
from datasette.utils.sqlite import sqlite3
from datasette.utils.sqlite import sqlite3, sqlite_table_type
import json
import os
import pathlib
@ -226,6 +226,49 @@ def test_detect_fts_different_table_names(table):
conn.close()
@pytest.mark.parametrize("use_fallback", (False, True))
def test_sqlite_table_type_detects_virtual_and_shadow_tables(monkeypatch, use_fallback):
if use_fallback:
monkeypatch.setattr("datasette.utils.sqlite.sqlite_version", lambda: (3, 25, 0))
conn = utils.sqlite3.connect(":memory:")
try:
conn.executescript("""
create table dogs(id integer primary key, name text);
create view dog_names as select name from dogs;
create virtual table search_index using fts5(title, body);
create virtual table boxes using rtree(id, minx, maxx, miny, maxy);
""")
assert sqlite_table_type(conn, "dogs") == "table"
assert sqlite_table_type(conn, "dog_names") == "view"
assert sqlite_table_type(conn, "search_index") == "virtual"
assert sqlite_table_type(conn, "search_index_config") == "shadow"
assert sqlite_table_type(conn, "boxes") == "virtual"
assert sqlite_table_type(conn, "boxes_node") == "shadow"
assert sqlite_table_type(conn, "missing") is None
finally:
conn.close()
@pytest.mark.parametrize("use_fallback", (False, True))
def test_sqlite_table_type_detects_attached_database_tables(monkeypatch, use_fallback):
if use_fallback:
monkeypatch.setattr("datasette.utils.sqlite.sqlite_version", lambda: (3, 25, 0))
conn = utils.sqlite3.connect(":memory:")
try:
conn.executescript("""
attach database ':memory:' as extra;
create table extra.cats(id integer primary key, name text);
create virtual table extra.cat_search using fts5(name);
""")
assert sqlite_table_type(conn, "cats", schema="extra") == "table"
assert sqlite_table_type(conn, "cat_search", schema="extra") == "virtual"
assert sqlite_table_type(conn, "cat_search_data", schema="extra") == "shadow"
finally:
conn.close()
@pytest.mark.parametrize(
"url,expected",
[

View file

@ -260,6 +260,53 @@ def test_analyze_create_virtual_table_operation():
} in [operation_dict(operation) for operation in analysis.operations]
def test_analyze_table_kind_for_regular_virtual_and_shadow_tables():
conn = sqlite3.connect(":memory:")
try:
conn.executescript("""
create table dogs (id integer primary key, name text);
create virtual table docs using fts5(title, body, content='');
""")
regular_analysis = analyze_sql_tables(
conn,
"insert into dogs (name) values ('Cleo')",
database_name="data",
)
virtual_analysis = analyze_sql_tables(
conn,
"insert into docs(docs) values('delete-all')",
database_name="data",
)
shadow_analysis = analyze_sql_tables(
conn,
"insert into docs_config(k, v) values ('x', 1)",
database_name="data",
)
finally:
conn.close()
regular_insert = next(
operation
for operation in regular_analysis.operations
if operation.operation == "insert" and operation.table == "dogs"
)
virtual_insert = next(
operation
for operation in virtual_analysis.operations
if operation.operation == "insert" and operation.table == "docs"
)
shadow_insert = next(
operation
for operation in shadow_analysis.operations
if operation.operation == "insert" and operation.table == "docs_config"
)
assert regular_insert.table_kind == "table"
assert virtual_insert.table_kind == "virtual"
assert shadow_insert.table_kind == "shadow"
def test_analyze_create_table_as_select_function_is_not_internal():
conn = sqlite3.connect(":memory:")
try: