import pytest from datasette.utils.sqlite import sqlite3 from datasette.utils.sql_analysis import analyze_sql_tables @pytest.fixture def conn(): conn = sqlite3.connect(":memory:") conn.executescript(""" create table dogs (id integer primary key, name text, age integer); create table cats (id integer primary key, name text); create table log (message text); create view dog_names as select id, name from dogs; create trigger dogs_after_insert after insert on dogs begin update cats set name = new.name where id = new.id; insert into log (message) values (new.name); end; create trigger dog_names_instead_of_update instead of update on dog_names begin update dogs set name = new.name where id = old.id; end; """) try: yield conn finally: conn.close() def as_tuples(analysis): return [ ( access.operation, access.database, access.sqlite_schema, access.table, access.columns, access.source, ) for access in analysis.table_accesses ] def test_analyze_select_tables(conn): analysis = analyze_sql_tables( conn, "select dogs.name, cats.name from dogs join cats on dogs.id = cats.id where dogs.age > ?", (2,), database_name="data", ) assert set(as_tuples(analysis)) == { ("read", "data", "main", "cats", ("id", "name"), None), ("read", "data", "main", "dogs", ("age", "id", "name"), None), } def test_analyze_uses_sqlite_schema_as_default_database(conn): analysis = analyze_sql_tables(conn, "select name from dogs") assert set(as_tuples(analysis)) == { ("read", "main", "main", "dogs", ("name",), None), } def test_analyze_insert_tables(conn): analysis = analyze_sql_tables( conn, "insert into dogs (name, age) values (:name, :age)", {"name": "Cleo", "age": 4}, database_name="data", ) assert set(as_tuples(analysis)) == { ("insert", "data", "main", "dogs", (), None), ("read", "data", "main", "dogs", ("id", "name"), "dogs_after_insert"), ("update", "data", "main", "cats", ("name",), "dogs_after_insert"), ("read", "data", "main", "cats", ("id",), "dogs_after_insert"), ("insert", "data", "main", "log", (), "dogs_after_insert"), } def test_analyze_update_tables(conn): analysis = analyze_sql_tables( conn, "update dogs set age = age + 1 where name = ?", ("Cleo",), database_name="data", ) assert set(as_tuples(analysis)) == { ("update", "data", "main", "dogs", ("age",), None), ("read", "data", "main", "dogs", ("age", "name"), None), } def test_analyze_delete_tables(conn): analysis = analyze_sql_tables( conn, "delete from dogs where name = ?", ("Cleo",), database_name="data", ) assert set(as_tuples(analysis)) == { ("delete", "data", "main", "dogs", (), None), ("read", "data", "main", "dogs", ("name",), None), } def test_analyze_insert_select_with_cte(conn): analysis = analyze_sql_tables( conn, """ with old_dogs as ( select name from dogs where age > :age ) insert into cats (name) select name from old_dogs """, {"age": 10}, database_name="data", ) assert set(as_tuples(analysis)) == { ("insert", "data", "main", "cats", (), None), ("read", "data", "main", "dogs", ("age", "name"), "old_dogs"), } def test_analyze_view_with_instead_of_trigger(conn): analysis = analyze_sql_tables( conn, "update dog_names set name = :name where id = :id", {"name": "Zelda", "id": 1}, database_name="data", ) assert set(as_tuples(analysis)) == { ("update", "data", "main", "dog_names", ("name",), None), ("read", "data", "main", "dogs", ("id", "name"), "dog_names"), ("read", "data", "main", "dog_names", ("id", "name"), "dog_names"), ( "read", "data", "main", "dog_names", ("id", "name"), "dog_names_instead_of_update", ), ("update", "data", "main", "dogs", ("name",), "dog_names_instead_of_update"), ("read", "data", "main", "dogs", ("id",), "dog_names_instead_of_update"), } def test_analyze_attached_database_tables(conn): conn.execute("attach database ':memory:' as extra") conn.execute("create table extra.people (id integer primary key, name text)") analysis = analyze_sql_tables( conn, "insert into extra.people (name) select name from dogs", database_name="data", schema_to_database={"extra": "extra_db"}, ) assert set(as_tuples(analysis)) == { ("insert", "extra_db", "extra", "people", (), None), ("read", "data", "main", "dogs", ("name",), None), } def test_analyze_invalid_sql_cleans_up_authorizer(conn): with pytest.raises(sqlite3.OperationalError): analyze_sql_tables(conn, "insert into missing_table values (1)") conn.execute("select name from dogs").fetchall() def test_analyze_clears_authorizer_on_error(): class FakeConnection: def __init__(self): self.authorizers = [] def set_authorizer(self, authorizer): self.authorizers.append(authorizer) def execute(self, sql, params): raise sqlite3.OperationalError("bad SQL") conn = FakeConnection() with pytest.raises(sqlite3.OperationalError): analyze_sql_tables(conn, "bad SQL") assert conn.authorizers[-1] is None