mirror of
https://github.com/simonw/datasette.git
synced 2026-05-27 20:36:17 +02:00
188 lines
5.4 KiB
Python
188 lines
5.4 KiB
Python
import pytest
|
|
|
|
from datasette.utils.sqlite import sqlite3
|
|
from datasette.utils.sql_analysis import analyze_sql_tables
|
|
|
|
|
|
@pytest.fixture
|
|
def conn():
|
|
conn = sqlite3.connect(":memory:")
|
|
conn.executescript("""
|
|
create table dogs (id integer primary key, name text, age integer);
|
|
create table cats (id integer primary key, name text);
|
|
create table log (message text);
|
|
create view dog_names as select id, name from dogs;
|
|
create trigger dogs_after_insert after insert on dogs begin
|
|
update cats set name = new.name where id = new.id;
|
|
insert into log (message) values (new.name);
|
|
end;
|
|
create trigger dog_names_instead_of_update instead of update on dog_names begin
|
|
update dogs set name = new.name where id = old.id;
|
|
end;
|
|
""")
|
|
try:
|
|
yield conn
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def as_tuples(analysis):
|
|
return [
|
|
(
|
|
access.operation,
|
|
access.database,
|
|
access.sqlite_schema,
|
|
access.table,
|
|
access.columns,
|
|
access.source,
|
|
)
|
|
for access in analysis.table_accesses
|
|
]
|
|
|
|
|
|
def test_analyze_select_tables(conn):
|
|
analysis = analyze_sql_tables(
|
|
conn,
|
|
"select dogs.name, cats.name from dogs join cats on dogs.id = cats.id where dogs.age > ?",
|
|
(2,),
|
|
database_name="data",
|
|
)
|
|
|
|
assert set(as_tuples(analysis)) == {
|
|
("read", "data", "main", "cats", ("id", "name"), None),
|
|
("read", "data", "main", "dogs", ("age", "id", "name"), None),
|
|
}
|
|
|
|
|
|
def test_analyze_uses_sqlite_schema_as_default_database(conn):
|
|
analysis = analyze_sql_tables(conn, "select name from dogs")
|
|
|
|
assert set(as_tuples(analysis)) == {
|
|
("read", "main", "main", "dogs", ("name",), None),
|
|
}
|
|
|
|
|
|
def test_analyze_insert_tables(conn):
|
|
analysis = analyze_sql_tables(
|
|
conn,
|
|
"insert into dogs (name, age) values (:name, :age)",
|
|
{"name": "Cleo", "age": 4},
|
|
database_name="data",
|
|
)
|
|
|
|
assert set(as_tuples(analysis)) == {
|
|
("insert", "data", "main", "dogs", (), None),
|
|
("read", "data", "main", "dogs", ("id", "name"), "dogs_after_insert"),
|
|
("update", "data", "main", "cats", ("name",), "dogs_after_insert"),
|
|
("read", "data", "main", "cats", ("id",), "dogs_after_insert"),
|
|
("insert", "data", "main", "log", (), "dogs_after_insert"),
|
|
}
|
|
|
|
|
|
def test_analyze_update_tables(conn):
|
|
analysis = analyze_sql_tables(
|
|
conn,
|
|
"update dogs set age = age + 1 where name = ?",
|
|
("Cleo",),
|
|
database_name="data",
|
|
)
|
|
|
|
assert set(as_tuples(analysis)) == {
|
|
("update", "data", "main", "dogs", ("age",), None),
|
|
("read", "data", "main", "dogs", ("age", "name"), None),
|
|
}
|
|
|
|
|
|
def test_analyze_delete_tables(conn):
|
|
analysis = analyze_sql_tables(
|
|
conn,
|
|
"delete from dogs where name = ?",
|
|
("Cleo",),
|
|
database_name="data",
|
|
)
|
|
|
|
assert set(as_tuples(analysis)) == {
|
|
("delete", "data", "main", "dogs", (), None),
|
|
("read", "data", "main", "dogs", ("name",), None),
|
|
}
|
|
|
|
|
|
def test_analyze_insert_select_with_cte(conn):
|
|
analysis = analyze_sql_tables(
|
|
conn,
|
|
"""
|
|
with old_dogs as (
|
|
select name from dogs where age > :age
|
|
)
|
|
insert into cats (name)
|
|
select name from old_dogs
|
|
""",
|
|
{"age": 10},
|
|
database_name="data",
|
|
)
|
|
|
|
assert set(as_tuples(analysis)) == {
|
|
("insert", "data", "main", "cats", (), None),
|
|
("read", "data", "main", "dogs", ("age", "name"), "old_dogs"),
|
|
}
|
|
|
|
|
|
def test_analyze_view_with_instead_of_trigger(conn):
|
|
analysis = analyze_sql_tables(
|
|
conn,
|
|
"update dog_names set name = :name where id = :id",
|
|
{"name": "Zelda", "id": 1},
|
|
database_name="data",
|
|
)
|
|
|
|
assert set(as_tuples(analysis)) == {
|
|
("update", "data", "main", "dog_names", ("name",), None),
|
|
("read", "data", "main", "dogs", ("id", "name"), "dog_names"),
|
|
("read", "data", "main", "dog_names", ("id", "name"), "dog_names"),
|
|
(
|
|
"read",
|
|
"data",
|
|
"main",
|
|
"dog_names",
|
|
("id", "name"),
|
|
"dog_names_instead_of_update",
|
|
),
|
|
("update", "data", "main", "dogs", ("name",), "dog_names_instead_of_update"),
|
|
("read", "data", "main", "dogs", ("id",), "dog_names_instead_of_update"),
|
|
}
|
|
|
|
|
|
def test_analyze_attached_database_tables(conn):
|
|
conn.execute("attach database ':memory:' as extra")
|
|
conn.execute("create table extra.people (id integer primary key, name text)")
|
|
|
|
analysis = analyze_sql_tables(
|
|
conn,
|
|
"insert into extra.people (name) select name from dogs",
|
|
database_name="data",
|
|
schema_to_database={"extra": "extra_db"},
|
|
)
|
|
|
|
assert set(as_tuples(analysis)) == {
|
|
("insert", "extra_db", "extra", "people", (), None),
|
|
("read", "data", "main", "dogs", ("name",), None),
|
|
}
|
|
|
|
|
|
def test_analyze_clears_authorizer_on_error():
|
|
class FakeConnection:
|
|
def __init__(self):
|
|
self.authorizers = []
|
|
|
|
def set_authorizer(self, authorizer):
|
|
self.authorizers.append(authorizer)
|
|
|
|
def execute(self, sql, params):
|
|
raise sqlite3.OperationalError("bad SQL")
|
|
|
|
conn = FakeConnection()
|
|
|
|
with pytest.raises(sqlite3.OperationalError):
|
|
analyze_sql_tables(conn, "bad SQL")
|
|
|
|
assert conn.authorizers[-1] is None
|