Use allowed_resources_sql() with CTE for table filtering

This commit is contained in:
Simon Willison 2025-10-23 15:33:35 -07:00
commit 96d2e16e83
2 changed files with 36 additions and 33 deletions

View file

@ -953,36 +953,35 @@ class TablesView(BaseView):
terms = q.split() terms = q.split()
pattern = "%" + "%".join(terms) + "%" pattern = "%" + "%".join(terms) + "%"
matches = [] # Get SQL for allowed resources using the permission system
# Query each database's sqlite_master for matching tables permission_sql, params = await self.ds.allowed_resources_sql(
for db_name in self.ds.databases.keys(): action="view-table", actor=request.actor
db = self.ds.databases[db_name] )
try:
# Use SQL to find matching table names, ordered by shortest first # Build query with CTE to filter by search pattern
result = await db.execute( sql = f"""
""" WITH allowed_tables AS (
SELECT name {permission_sql}
FROM sqlite_master )
WHERE type='table' SELECT parent, child
AND name LIKE ? COLLATE NOCASE FROM allowed_tables
ORDER BY length(name), name WHERE child LIKE :pattern COLLATE NOCASE
""", ORDER BY length(child), child
[pattern] """
)
# Check permissions for each matching table # Merge params from permission SQL with our pattern param
for row in result.rows: all_params = {**params, "pattern": pattern}
table_name = row[0]
if await self.ds.permission_allowed( # Execute against internal database
request.actor, result = await self.ds.get_internal_database().execute(sql, all_params)
"view-table",
resource=(db_name, table_name) # Build response
): matches = [
matches.append({ {
"name": f"{db_name}: {table_name}", "name": f"{row['parent']}: {row['child']}",
"url": self.ds.urls.table(db_name, table_name), "url": self.ds.urls.table(row["parent"], row["child"]),
}) }
except Exception: for row in result.rows
# Skip databases that can't be queried ]
continue
return Response.json({"matches": matches}) return Response.json({"matches": matches})

View file

@ -523,7 +523,9 @@ async def test_tables_endpoint_config_database_allow():
{"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)} {"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)}
for t in root_tables for t in root_tables
] ]
restricted_tables_root = [m for m in root_list if m["name"].startswith("restricted_db/")] restricted_tables_root = [
m for m in root_list if m["name"].startswith("restricted_db/")
]
assert len(restricted_tables_root) == 2 assert len(restricted_tables_root) == 2
table_names = {m["name"] for m in restricted_tables_root} table_names = {m["name"] for m in restricted_tables_root}
assert "restricted_db/users" in table_names assert "restricted_db/users" in table_names
@ -535,7 +537,9 @@ async def test_tables_endpoint_config_database_allow():
{"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)} {"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)}
for t in alice_tables for t in alice_tables
] ]
restricted_tables_alice = [m for m in alice_list if m["name"].startswith("restricted_db/")] restricted_tables_alice = [
m for m in alice_list if m["name"].startswith("restricted_db/")
]
assert len(restricted_tables_alice) == 0 assert len(restricted_tables_alice) == 0
# But Alice should see public_db tables (no restrictions) # But Alice should see public_db tables (no restrictions)