Use allowed_resources_sql() with CTE for table filtering

This commit is contained in:
Simon Willison 2025-10-23 15:33:35 -07:00
commit 4b50cc7bc1
2 changed files with 36 additions and 33 deletions

View file

@ -953,36 +953,35 @@ class TablesView(BaseView):
terms = q.split()
pattern = "%" + "%".join(terms) + "%"
matches = []
# Query each database's sqlite_master for matching tables
for db_name in self.ds.databases.keys():
db = self.ds.databases[db_name]
try:
# Use SQL to find matching table names, ordered by shortest first
result = await db.execute(
"""
SELECT name
FROM sqlite_master
WHERE type='table'
AND name LIKE ? COLLATE NOCASE
ORDER BY length(name), name
""",
[pattern]
)
# Check permissions for each matching table
for row in result.rows:
table_name = row[0]
if await self.ds.permission_allowed(
request.actor,
"view-table",
resource=(db_name, table_name)
):
matches.append({
"name": f"{db_name}: {table_name}",
"url": self.ds.urls.table(db_name, table_name),
})
except Exception:
# Skip databases that can't be queried
continue
# Get SQL for allowed resources using the permission system
permission_sql, params = await self.ds.allowed_resources_sql(
action="view-table", actor=request.actor
)
# Build query with CTE to filter by search pattern
sql = f"""
WITH allowed_tables AS (
{permission_sql}
)
SELECT parent, child
FROM allowed_tables
WHERE child LIKE :pattern COLLATE NOCASE
ORDER BY length(child), child
"""
# Merge params from permission SQL with our pattern param
all_params = {**params, "pattern": pattern}
# Execute against internal database
result = await self.ds.get_internal_database().execute(sql, all_params)
# Build response
matches = [
{
"name": f"{row['parent']}: {row['child']}",
"url": self.ds.urls.table(row["parent"], row["child"]),
}
for row in result.rows
]
return Response.json({"matches": matches})

View file

@ -523,7 +523,9 @@ async def test_tables_endpoint_config_database_allow():
{"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)}
for t in root_tables
]
restricted_tables_root = [m for m in root_list if m["name"].startswith("restricted_db/")]
restricted_tables_root = [
m for m in root_list if m["name"].startswith("restricted_db/")
]
assert len(restricted_tables_root) == 2
table_names = {m["name"] for m in restricted_tables_root}
assert "restricted_db/users" in table_names
@ -535,7 +537,9 @@ async def test_tables_endpoint_config_database_allow():
{"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)}
for t in alice_tables
]
restricted_tables_alice = [m for m in alice_list if m["name"].startswith("restricted_db/")]
restricted_tables_alice = [
m for m in alice_list if m["name"].startswith("restricted_db/")
]
assert len(restricted_tables_alice) == 0
# But Alice should see public_db tables (no restrictions)