Add keyset pagination to allowed_resources() (#2562)

* Add keyset pagination to allowed_resources()

This replaces the unbounded list return with PaginatedResources,
which supports efficient keyset pagination for handling thousands
of resources.

Closes #2560

Changes:
- allowed_resources() now returns PaginatedResources instead of list
- Added limit (1-1000, default 100) and next (keyset token) parameters
- Added include_reasons parameter (replaces allowed_resources_with_reasons)
- Removed allowed_resources_with_reasons() method entirely
- PaginatedResources.all() async generator for automatic pagination
- Uses tilde-encoding for tokens (matching table pagination)
- Updated all callers to use .resources accessor
- Updated documentation with new API and examples

The PaginatedResources object has:
- resources: List of Resource objects for current page
- next: Token for next page (None if no more results)
- all(): Async generator that yields all resources across pages

Example usage:
    page = await ds.allowed_resources("view-table", actor, limit=100)
    for table in page.resources:
        print(table.child)

    # Iterate all pages automatically
    async for table in page.all():
        print(table.child)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Simon Willison 2025-10-31 14:50:46 -07:00 committed by GitHub
commit 400fa08e4c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 366 additions and 223 deletions

View file

@ -71,6 +71,7 @@ from .url_builder import Urls
from .database import Database, QueryInterrupted
from .utils import (
PaginatedResources,
PrefixedUrlString,
SPATIALITE_FUNCTIONS,
StartupError,
@ -91,6 +92,7 @@ from .utils import (
resolve_env_secrets,
resolve_routes,
tilde_decode,
tilde_encode,
to_css_class,
urlsafe_components,
redact_keys,
@ -1147,104 +1149,147 @@ class Datasette:
*,
parent: str | None = None,
include_is_private: bool = False,
) -> list["Resource"]:
include_reasons: bool = False,
limit: int = 100,
next: str | None = None,
) -> PaginatedResources:
"""
Return all resources the actor can access for the given action.
Return paginated resources the actor can access for the given action.
Uses SQL to filter resources based on cascading permission rules.
Returns instances of the appropriate Resource subclass.
Uses SQL with keyset pagination to efficiently filter resources.
Returns PaginatedResources with list of Resource instances and pagination metadata.
Args:
action: The action name (e.g., "view-table")
actor: The actor dict (or None for unauthenticated)
parent: Optional parent filter (e.g., database name) to limit results
include_is_private: If True, adds a .private attribute to each Resource
include_reasons: If True, adds a .reasons attribute with List[str] of permission reasons
limit: Maximum number of results to return (1-1000, default 100)
next: Keyset token from previous page for pagination
Returns:
PaginatedResources with:
- resources: List of Resource objects for this page
- next: Token for next page (None if no more results)
Example:
# Get all tables
tables = await datasette.allowed_resources("view-table", actor)
for table in tables:
# Get first page of tables
page = await datasette.allowed_resources("view-table", actor, limit=50)
for table in page.resources:
print(f"{table.parent}/{table.child}")
# Get tables for specific database with private flag
tables = await datasette.allowed_resources(
"view-table", actor, parent="mydb", include_is_private=True
# Get next page
if page.next:
next_page = await datasette.allowed_resources(
"view-table", actor, limit=50, next=page.next
)
# With reasons for debugging
page = await datasette.allowed_resources(
"view-table", actor, include_reasons=True
)
for table in tables:
if table.private:
print(f"{table.child} is private")
for table in page.resources:
print(f"{table.child}: {table.reasons}")
# Iterate through all results with async generator
page = await datasette.allowed_resources("view-table", actor)
async for table in page.all():
print(table.child)
"""
action_obj = self.actions.get(action)
if not action_obj:
raise ValueError(f"Unknown action: {action}")
# Validate and cap limit
limit = min(max(1, limit), 1000)
# Get base SQL query
query, params = await self.allowed_resources_sql(
action=action,
actor=actor,
parent=parent,
include_is_private=include_is_private,
)
result = await self.get_internal_database().execute(query, params)
# Instantiate the appropriate Resource subclass for each row
# Add keyset pagination WHERE clause if next token provided
if next:
try:
components = urlsafe_components(next)
if len(components) >= 2:
last_parent, last_child = components[0], components[1]
# Keyset condition: (parent > last) OR (parent = last AND child > last)
keyset_where = """
(parent > :keyset_parent OR
(parent = :keyset_parent AND child > :keyset_child))
"""
# Wrap original query and add keyset filter
query = f"SELECT * FROM ({query}) WHERE {keyset_where}"
params["keyset_parent"] = last_parent
params["keyset_child"] = last_child
except (ValueError, KeyError):
# Invalid token - ignore and start from beginning
pass
# Add LIMIT (fetch limit+1 to detect if there are more results)
# Note: query from allowed_resources_sql() already includes ORDER BY parent, child
query = f"{query} LIMIT :limit"
params["limit"] = limit + 1
# Execute query
result = await self.get_internal_database().execute(query, params)
rows = list(result.rows)
# Check if truncated (got more than limit rows)
truncated = len(rows) > limit
if truncated:
rows = rows[:limit] # Remove the extra row
# Build Resource objects with optional attributes
resources = []
for row in result.rows:
# row[0]=parent, row[1]=child, row[2]=reason (ignored), row[3]=is_private (if requested)
for row in rows:
# row[0]=parent, row[1]=child, row[2]=reason, row[3]=is_private (if requested)
resource = self.resource_for_action(action, parent=row[0], child=row[1])
# Add reasons if requested
if include_reasons:
reason_json = row[2]
try:
reasons_array = (
json.loads(reason_json) if isinstance(reason_json, str) else []
)
resource.reasons = [r for r in reasons_array if r is not None]
except (json.JSONDecodeError, TypeError):
resource.reasons = [reason_json] if reason_json else []
# Add private flag if requested
if include_is_private:
resource.private = bool(row[3])
resources.append(resource)
return resources
# Generate next token if there are more results
next_token = None
if truncated and resources:
last_resource = resources[-1]
# Use tilde-encoding like table pagination
next_token = "{},{}".format(
tilde_encode(str(last_resource.parent)),
tilde_encode(str(last_resource.child)),
)
async def allowed_resources_with_reasons(
self,
action: str,
actor: dict | None = None,
) -> list["AllowedResource"]:
"""
Return allowed resources with permission reasons for debugging.
Uses SQL to filter resources and includes the reason each was allowed.
Returns list of AllowedResource named tuples with (resource, reason).
Example:
debug_info = await datasette.allowed_resources_with_reasons("view-table", actor)
for allowed in debug_info:
print(f"{allowed.resource}: {allowed.reason}")
"""
from datasette.permissions import AllowedResource
action_obj = self.actions.get(action)
if not action_obj:
raise ValueError(f"Unknown action: {action}")
query, params = await self.allowed_resources_sql(action=action, actor=actor)
result = await self.get_internal_database().execute(query, params)
resources = []
for row in result.rows:
resource = self.resource_for_action(action, parent=row[0], child=row[1])
reason_json = row[2]
# Parse JSON array of reasons and filter out nulls
try:
import json
reasons_array = (
json.loads(reason_json) if isinstance(reason_json, str) else []
)
reasons_filtered = [r for r in reasons_array if r is not None]
# Store as list for multiple reasons, or keep empty list
reason = reasons_filtered
except (json.JSONDecodeError, TypeError):
# Fallback for backward compatibility
reason = [reason_json] if reason_json else []
resources.append(AllowedResource(resource=resource, reason=reason))
return resources
return PaginatedResources(
resources=resources,
next=next_token,
_datasette=self,
_action=action,
_actor=actor,
_parent=parent,
_include_is_private=include_is_private,
_include_reasons=include_reasons,
_limit=limit,
)
async def allowed(
self,

View file

@ -16,6 +16,10 @@ class Resource(ABC):
name: str = None # e.g., "table", "database", "model"
parent_name: str | None = None # e.g., "database" for tables
# Instance-level optional extra attributes
reasons: list[str] | None = None
include_reasons: bool | None = None
def __init__(self, parent: str | None = None, child: str | None = None):
"""
Create a resource instance.

View file

@ -4,6 +4,7 @@ import aiofiles
import click
from collections import OrderedDict, namedtuple, Counter
import copy
import dataclasses
import base64
import hashlib
import inspect
@ -27,6 +28,58 @@ from .sqlite import sqlite3, supports_table_xinfo
if typing.TYPE_CHECKING:
from datasette.database import Database
from datasette.permissions import Resource
@dataclasses.dataclass
class PaginatedResources:
"""Paginated results from allowed_resources query."""
resources: List["Resource"]
next: str | None # Keyset token for next page (None if no more results)
_datasette: typing.Any = dataclasses.field(default=None, repr=False)
_action: str = dataclasses.field(default=None, repr=False)
_actor: typing.Any = dataclasses.field(default=None, repr=False)
_parent: str | None = dataclasses.field(default=None, repr=False)
_include_is_private: bool = dataclasses.field(default=False, repr=False)
_include_reasons: bool = dataclasses.field(default=False, repr=False)
_limit: int = dataclasses.field(default=100, repr=False)
async def all(self):
"""
Async generator that yields all resources across all pages.
Automatically handles pagination under the hood. This is useful when you need
to iterate through all results without manually managing pagination tokens.
Yields:
Resource objects one at a time
Example:
page = await datasette.allowed_resources("view-table", actor)
async for table in page.all():
print(f"{table.parent}/{table.child}")
"""
# Yield all resources from current page
for resource in self.resources:
yield resource
# Continue fetching subsequent pages if there are more
next_token = self.next
while next_token:
page = await self._datasette.allowed_resources(
self._action,
self._actor,
parent=self._parent,
include_is_private=self._include_is_private,
include_reasons=self._include_reasons,
limit=self._limit,
next=next_token,
)
for resource in page.resources:
yield resource
next_token = page.next
# From https://www.sqlite.org/lang_keywords.html
reserved_words = set(

View file

@ -70,12 +70,15 @@ class DatabaseView(View):
metadata = await datasette.get_database_metadata(database)
# Get all tables/views this actor can see in bulk with private flag
allowed_tables = await datasette.allowed_resources(
"view-table", request.actor, parent=database, include_is_private=True
allowed_tables_page = await datasette.allowed_resources(
"view-table",
request.actor,
parent=database,
include_is_private=True,
limit=1000,
)
# Create lookup dict for quick access
allowed_dict = {r.child: r for r in allowed_tables}
allowed_dict = {r.child: r for r in allowed_tables_page.resources}
# Filter to just views
view_names_set = set(await db.view_names())
@ -88,14 +91,18 @@ class DatabaseView(View):
tables = await get_tables(datasette, request, db, allowed_dict)
# Get allowed queries using the new permission system
allowed_query_resources = await datasette.allowed_resources(
"view-query", request.actor, parent=database, include_is_private=True
allowed_query_page = await datasette.allowed_resources(
"view-query",
request.actor,
parent=database,
include_is_private=True,
limit=1000,
)
# Build canned_queries list by looking up each allowed query
all_queries = await datasette.get_canned_queries(database, request.actor)
canned_queries = []
for query_resource in allowed_query_resources:
for query_resource in allowed_query_page.resources:
query_name = query_resource.child
if query_name in all_queries:
canned_queries.append(
@ -509,12 +516,15 @@ class QueryView(View):
database = db.name
# Get all tables/views this actor can see in bulk with private flag
allowed_tables = await datasette.allowed_resources(
"view-table", request.actor, parent=database, include_is_private=True
allowed_tables_page = await datasette.allowed_resources(
"view-table",
request.actor,
parent=database,
include_is_private=True,
limit=1000,
)
# Create lookup dict for quick access
allowed_dict = {r.child: r for r in allowed_tables}
allowed_dict = {r.child: r for r in allowed_tables_page.resources}
# Are we a canned query?
canned_query = None

View file

@ -28,17 +28,18 @@ class IndexView(BaseView):
await self.ds.ensure_permission(action="view-instance", actor=request.actor)
# Get all allowed databases and tables in bulk
allowed_databases = await self.ds.allowed_resources(
db_page = await self.ds.allowed_resources(
"view-database", request.actor, include_is_private=True
)
allowed_databases = [r async for r in db_page.all()]
allowed_db_dict = {r.parent: r for r in allowed_databases}
allowed_tables = await self.ds.allowed_resources(
# Group tables by database
tables_by_db = {}
table_page = await self.ds.allowed_resources(
"view-table", request.actor, include_is_private=True
)
# Group by database
tables_by_db = {}
for t in allowed_tables:
async for t in table_page.all():
if t.parent not in tables_by_db:
tables_by_db[t.parent] = {}
tables_by_db[t.parent][t.child] = t

View file

@ -268,19 +268,38 @@ class AllowedResourcesView(BaseView):
offset = (page - 1) * page_size
# Use the simplified allowed_resources method
# If user has debug permission, use the with_reasons variant
# Collect all resources with optional reasons for debugging
try:
if has_debug_permission:
allowed_resources = await self.ds.allowed_resources_with_reasons(
action=action,
actor=actor,
)
else:
allowed_resources = await self.ds.allowed_resources(
action=action,
actor=actor,
parent=parent_filter,
)
allowed_rows = []
result = await self.ds.allowed_resources(
action=action,
actor=actor,
parent=parent_filter,
include_reasons=has_debug_permission,
)
async for resource in result.all():
parent_val = resource.parent
child_val = resource.child
# Build resource path
if parent_val is None:
resource_path = "/"
elif child_val is None:
resource_path = f"/{parent_val}"
else:
resource_path = f"/{parent_val}/{child_val}"
row = {
"parent": parent_val,
"child": child_val,
"resource": resource_path,
}
# Add reason if we have it (from include_reasons=True)
if has_debug_permission and hasattr(resource, "reasons"):
row["reason"] = resource.reasons
allowed_rows.append(row)
except Exception:
# If catalog tables don't exist yet, return empty results
return (
@ -295,46 +314,6 @@ class AllowedResourcesView(BaseView):
200,
)
# Convert to list of dicts with resource path
allowed_rows = []
for item in allowed_resources:
# Extract resource and reason depending on what we got back
if has_debug_permission:
# allowed_resources_with_reasons returns AllowedResource(resource, reason)
resource = item.resource
reason = item.reason
else:
# allowed_resources returns plain Resource objects
resource = item
reason = None
parent_val = resource.parent
child_val = resource.child
# Apply parent filter if needed (when using with_reasons, we need to filter manually)
if parent_filter is not None and parent_val != parent_filter:
continue
# Build resource path
if parent_val is None:
resource_path = "/"
elif child_val is None:
resource_path = f"/{parent_val}"
else:
resource_path = f"/{parent_val}/{child_val}"
row = {
"parent": parent_val,
"child": child_val,
"resource": resource_path,
}
# Add reason if we have it (it's already a list from allowed_resources_with_reasons)
if reason is not None:
row["reason"] = reason
allowed_rows.append(row)
# Apply child filter if specified
if child_filter is not None:
allowed_rows = [row for row in allowed_rows if row["child"] == child_filter]
@ -652,10 +631,11 @@ class CreateTokenView(BaseView):
async def shared(self, request):
self.check_permission(request)
# Build list of databases and tables the user has permission to view
allowed_databases = await self.ds.allowed_resources(
"view-database", request.actor
)
allowed_tables = await self.ds.allowed_resources("view-table", request.actor)
db_page = await self.ds.allowed_resources("view-database", request.actor)
allowed_databases = [r async for r in db_page.all()]
table_page = await self.ds.allowed_resources("view-table", request.actor)
allowed_tables = [r async for r in table_page.all()]
# Build database -> tables mapping
database_with_tables = []

View file

@ -387,32 +387,80 @@ The method returns ``True`` if the permission is granted, ``False`` if denied.
.. _datasette_allowed_resources:
await .allowed_resources(action, actor=None, \*, parent=None, include_is_private=False)
---------------------------------------------------------------------------------------
await .allowed_resources(action, actor=None, \*, parent=None, include_is_private=False, include_reasons=False, limit=100, next=None)
------------------------------------------------------------------------------------------------------------------------------------
Returns a list of ``Resource`` objects that the actor can access for the
specified action. Each returned object is an instance of the action's
``resource_class`` and may include a ``.private`` attribute (when
``include_is_private=True``) to indicate that anonymous actors would be denied
access.
Returns a ``PaginatedResources`` object containing resources that the actor can access for the specified action, with support for keyset pagination.
Example::
``action`` - string
The action name (e.g., "view-table", "view-database")
tables = await datasette.allowed_resources(
"view-table", actor=request.actor, parent="fixtures"
``actor`` - dictionary, optional
The authenticated actor. Defaults to ``None`` for unauthenticated requests.
``parent`` - string, optional
Optional parent filter (e.g., database name) to limit results
``include_is_private`` - boolean, optional
If True, adds a ``.private`` attribute to each Resource indicating whether anonymous users can access it
``include_reasons`` - boolean, optional
If True, adds a ``.reasons`` attribute with a list of strings describing why access was granted (useful for debugging)
``limit`` - integer, optional
Maximum number of results to return per page (1-1000, default 100)
``next`` - string, optional
Keyset token from a previous page for pagination
The method returns a ``PaginatedResources`` object (from ``datasette.utils``) with the following attributes:
``resources`` - list
List of ``Resource`` objects for the current page
``next`` - string or None
Token for the next page, or ``None`` if no more results exist
Example usage:
.. code-block:: python
# Get first page of tables
page = await datasette.allowed_resources(
"view-table",
actor=request.actor,
parent="fixtures",
limit=50,
)
for table in tables:
for table in page.resources:
print(table.parent, table.child)
if hasattr(table, "private"):
print(f" Private: {table.private}")
# Get next page if available
if page.next:
next_page = await datasette.allowed_resources(
"view-table", actor=request.actor, next=page.next
)
# Iterate through all results automatically
page = await datasette.allowed_resources(
"view-table", actor=request.actor
)
async for table in page.all():
print(table.parent, table.child)
This method uses :ref:`datasette_allowed_resources_sql` under the hood and is an
efficient way to list the databases, tables or queries visible to a user.
# With reasons for debugging
page = await datasette.allowed_resources(
"view-table", actor=request.actor, include_reasons=True
)
for table in page.resources:
print(f"{table.child}: {table.reasons}")
.. _datasette_allowed_resources_with_reasons:
The ``page.all()`` async generator automatically handles pagination, fetching additional pages and yielding all resources one at a time.
await .allowed_resources_with_reasons(action, actor=None)
---------------------------------------------------------
Returns a list of :class:`datasette.permissions.AllowedResource` tuples. Each tuple contains a ``Resource`` plus a list of strings describing the rules that granted access. This powers the debugging data shown by the ``/-/allowed`` endpoint and is helpful when building administrative tooling that needs to show why access was granted.
This method uses :ref:`datasette_allowed_resources_sql` under the hood and is an efficient way to list the databases, tables or other resources that an actor can access for a specific action.
.. _datasette_allowed_resources_sql:

View file

@ -2,9 +2,9 @@
Tests for the new Resource-based permission system.
These tests verify:
1. The new Datasette.allowed_resources() method
1. The new Datasette.allowed_resources() method (with pagination)
2. The new Datasette.allowed() method
3. The new Datasette.allowed_resources_with_reasons() method
3. The include_reasons parameter for debugging
4. That SQL does the heavy lifting (no Python filtering)
"""
@ -71,7 +71,8 @@ async def test_allowed_resources_global_allow(test_ds):
try:
# Use the new allowed_resources() method
tables = await test_ds.allowed_resources("view-table", {"id": "alice"})
result = await test_ds.allowed_resources("view-table", {"id": "alice"})
tables = result.resources
# Alice should see all tables
assert len(tables) == 5
@ -133,9 +134,7 @@ async def test_allowed_specific_resource(test_ds):
@pytest.mark.asyncio
async def test_allowed_resources_with_reasons(test_ds):
"""Test allowed_resources_with_reasons() exposes debugging info"""
async def test_allowed_resources_include_reasons(test_ds):
def rules_callback(datasette, actor, action):
if actor and actor.get("role") == "analyst":
sql = """
@ -152,21 +151,22 @@ async def test_allowed_resources_with_reasons(test_ds):
pm.register(plugin, name="test_plugin")
try:
# Use allowed_resources_with_reasons to get debugging info
allowed = await test_ds.allowed_resources_with_reasons(
"view-table", {"id": "bob", "role": "analyst"}
# Use allowed_resources with include_reasons to get debugging info
result = await test_ds.allowed_resources(
"view-table", {"id": "bob", "role": "analyst"}, include_reasons=True
)
allowed = result.resources
# Should get analytics tables except sensitive
assert len(allowed) >= 2 # At least users and events
# Check we can access both resource and reason
for item in allowed:
assert isinstance(item.resource, TableResource)
assert isinstance(item.reason, list)
if item.resource.parent == "analytics":
for resource in allowed:
assert isinstance(resource, TableResource)
assert isinstance(resource.reasons, list)
if resource.parent == "analytics":
# Should mention parent-level reason in at least one of the reasons
reasons_text = " ".join(item.reason).lower()
reasons_text = " ".join(resource.reasons).lower()
assert "analyst access" in reasons_text
finally:
@ -194,7 +194,8 @@ async def test_child_deny_overrides_parent_allow(test_ds):
try:
actor = {"id": "bob", "role": "analyst"}
tables = await test_ds.allowed_resources("view-table", actor)
result = await test_ds.allowed_resources("view-table", actor)
tables = result.resources
# Should see analytics tables except sensitive
analytics_tables = [t for t in tables if t.parent == "analytics"]
@ -242,7 +243,8 @@ async def test_child_allow_overrides_parent_deny(test_ds):
try:
actor = {"id": "carol"}
tables = await test_ds.allowed_resources("view-table", actor)
result = await test_ds.allowed_resources("view-table", actor)
tables = result.resources
# Should only see production.orders
production_tables = [t for t in tables if t.parent == "production"]
@ -305,7 +307,8 @@ async def test_sql_does_filtering_not_python(test_ds):
)
# allowed_resources() should also use SQL filtering
tables = await test_ds.allowed_resources("view-table", actor)
result = await test_ds.allowed_resources("view-table", actor)
tables = result.resources
assert len(tables) == 1
assert tables[0].parent == "analytics"
assert tables[0].child == "users"

View file

@ -66,7 +66,7 @@ async def test_tables_endpoint_global_access(test_ds):
try:
# Use the allowed_resources API directly
tables = await test_ds.allowed_resources("view-table", {"id": "alice"})
page = await test_ds.allowed_resources("view-table", {"id": "alice"})
# Convert to the format the endpoint returns
result = [
@ -74,7 +74,7 @@ async def test_tables_endpoint_global_access(test_ds):
"name": f"{t.parent}/{t.child}",
"url": test_ds.urls.table(t.parent, t.child),
}
for t in tables
for t in page.resources
]
# Alice should see all tables
@ -105,7 +105,7 @@ async def test_tables_endpoint_database_restriction(test_ds):
pm.register(plugin, name="test_plugin")
try:
tables = await test_ds.allowed_resources(
page = await test_ds.allowed_resources(
"view-table", {"id": "bob", "role": "analyst"}
)
result = [
@ -113,7 +113,7 @@ async def test_tables_endpoint_database_restriction(test_ds):
"name": f"{t.parent}/{t.child}",
"url": test_ds.urls.table(t.parent, t.child),
}
for t in tables
for t in page.resources
]
# Bob should only see analytics tables
@ -152,13 +152,13 @@ async def test_tables_endpoint_table_exception(test_ds):
pm.register(plugin, name="test_plugin")
try:
tables = await test_ds.allowed_resources("view-table", {"id": "carol"})
page = await test_ds.allowed_resources("view-table", {"id": "carol"})
result = [
{
"name": f"{t.parent}/{t.child}",
"url": test_ds.urls.table(t.parent, t.child),
}
for t in tables
for t in page.resources
]
# Carol should see analytics.users but not other analytics tables
@ -194,7 +194,7 @@ async def test_tables_endpoint_deny_overrides_allow(test_ds):
pm.register(plugin, name="test_plugin")
try:
tables = await test_ds.allowed_resources(
page = await test_ds.allowed_resources(
"view-table", {"id": "bob", "role": "analyst"}
)
result = [
@ -202,7 +202,7 @@ async def test_tables_endpoint_deny_overrides_allow(test_ds):
"name": f"{t.parent}/{t.child}",
"url": test_ds.urls.table(t.parent, t.child),
}
for t in tables
for t in page.resources
]
analytics_tables = [m for m in result if m["name"].startswith("analytics/")]
@ -230,10 +230,10 @@ async def test_tables_endpoint_no_permissions():
await ds._refresh_schemas()
# Unknown actor with no custom permissions
tables = await ds.allowed_resources("view-table", {"id": "unknown"})
page = await ds.allowed_resources("view-table", {"id": "unknown"})
result = [
{"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)}
for t in tables
for t in page.resources
]
# Should see tables (due to default_permissions.py providing default allow)
@ -260,13 +260,13 @@ async def test_tables_endpoint_specific_table_only(test_ds):
pm.register(plugin, name="test_plugin")
try:
tables = await test_ds.allowed_resources("view-table", {"id": "dave"})
page = await test_ds.allowed_resources("view-table", {"id": "dave"})
result = [
{
"name": f"{t.parent}/{t.child}",
"url": test_ds.urls.table(t.parent, t.child),
}
for t in tables
for t in page.resources
]
# Should see only the two specifically allowed tables
@ -298,13 +298,13 @@ async def test_tables_endpoint_empty_result(test_ds):
pm.register(plugin, name="test_plugin")
try:
tables = await test_ds.allowed_resources("view-table", {"id": "blocked"})
page = await test_ds.allowed_resources("view-table", {"id": "blocked"})
result = [
{
"name": f"{t.parent}/{t.child}",
"url": test_ds.urls.table(t.parent, t.child),
}
for t in tables
for t in page.resources
]
# Global deny should block access to all tables
@ -328,11 +328,11 @@ async def test_tables_endpoint_no_query_returns_all():
await ds._refresh_schemas()
# Get all tables without query
all_tables = await ds.allowed_resources("view-table", None)
page = await ds.allowed_resources("view-table", None)
# Should return all tables with truncated: false
assert len(all_tables) >= 3
table_names = {f"{t.parent}/{t.child}" for t in all_tables}
assert len(page.resources) >= 3
table_names = {f"{t.parent}/{t.child}" for t in page.resources}
assert "test_db/users" in table_names
assert "test_db/posts" in table_names
assert "test_db/comments" in table_names
@ -350,12 +350,13 @@ async def test_tables_endpoint_truncation():
await db.execute_write(f"CREATE TABLE table_{i:03d} (id INTEGER)")
await ds._refresh_schemas()
# Get all tables - should be truncated
all_tables = await ds.allowed_resources("view-table", None)
big_db_tables = [t for t in all_tables if t.parent == "big_db"]
# Get all tables - should be paginated with limit=100 by default
page = await ds.allowed_resources("view-table", None)
big_db_tables = [t for t in page.resources if t.parent == "big_db"]
# Should have exactly 105 tables in the database
assert len(big_db_tables) == 105
# Should have exactly 100 tables in first page (default limit)
assert len(big_db_tables) == 100
assert page.next is not None # More results available
@pytest.mark.asyncio
@ -374,10 +375,10 @@ async def test_tables_endpoint_search_single_term():
await ds._refresh_schemas()
# Get all tables in the new format
all_tables = await ds.allowed_resources("view-table", None)
page = await ds.allowed_resources("view-table", None)
matches = [
{"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)}
for t in all_tables
for t in page.resources
]
# Filter for "user" (extract table name from "db/table")
@ -411,10 +412,10 @@ async def test_tables_endpoint_search_multiple_terms():
await ds._refresh_schemas()
# Get all tables in the new format
all_tables = await ds.allowed_resources("view-table", None)
page = await ds.allowed_resources("view-table", None)
matches = [
{"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)}
for t in all_tables
for t in page.resources
]
# Filter for "user profile" (two terms, extract table name from "db/table")
@ -453,10 +454,10 @@ async def test_tables_endpoint_search_ordering():
await ds._refresh_schemas()
# Get all tables in the new format
all_tables = await ds.allowed_resources("view-table", None)
page = await ds.allowed_resources("view-table", None)
matches = [
{"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)}
for t in all_tables
for t in page.resources
]
# Filter for "user" and sort by table name length
@ -490,10 +491,10 @@ async def test_tables_endpoint_search_case_insensitive():
await ds._refresh_schemas()
# Get all tables in the new format
all_tables = await ds.allowed_resources("view-table", None)
page = await ds.allowed_resources("view-table", None)
matches = [
{"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)}
for t in all_tables
for t in page.resources
]
# Filter for "user" (lowercase) should match all case variants
@ -525,10 +526,10 @@ async def test_tables_endpoint_search_no_matches():
await ds._refresh_schemas()
# Get all tables in the new format
all_tables = await ds.allowed_resources("view-table", None)
page = await ds.allowed_resources("view-table", None)
matches = [
{"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)}
for t in all_tables
for t in page.resources
]
# Filter for "zzz" which doesn't exist
@ -563,10 +564,10 @@ async def test_tables_endpoint_config_database_allow():
await ds._refresh_schemas()
# Root user should see restricted_db tables
root_tables = await ds.allowed_resources("view-table", {"id": "root"})
root_page = await ds.allowed_resources("view-table", {"id": "root"})
root_list = [
{"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)}
for t in root_tables
for t in root_page.resources
]
restricted_tables_root = [
m for m in root_list if m["name"].startswith("restricted_db/")
@ -577,10 +578,10 @@ async def test_tables_endpoint_config_database_allow():
assert "restricted_db/posts" in table_names
# Alice should NOT see restricted_db tables
alice_tables = await ds.allowed_resources("view-table", {"id": "alice"})
alice_page = await ds.allowed_resources("view-table", {"id": "alice"})
alice_list = [
{"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)}
for t in alice_tables
for t in alice_page.resources
]
restricted_tables_alice = [
m for m in alice_list if m["name"].startswith("restricted_db/")

View file

@ -1327,14 +1327,14 @@ async def test_actor_restrictions_filters_allowed_resources(perms_ds):
actor = {"id": "user", "_r": {"r": {"perms_ds_one": {"t1": ["vt"]}}}}
# Should only return t1
allowed_tables = await perms_ds.allowed_resources("view-table", actor)
assert len(allowed_tables) == 1
assert allowed_tables[0].parent == "perms_ds_one"
assert allowed_tables[0].child == "t1"
page = await perms_ds.allowed_resources("view-table", actor)
assert len(page.resources) == 1
assert page.resources[0].parent == "perms_ds_one"
assert page.resources[0].child == "t1"
# Database listing should be empty (no view-database permission)
allowed_dbs = await perms_ds.allowed_resources("view-database", actor)
assert len(allowed_dbs) == 0
db_page = await perms_ds.allowed_resources("view-database", actor)
assert len(db_page.resources) == 0
@pytest.mark.asyncio
@ -1343,12 +1343,10 @@ async def test_actor_restrictions_database_level(perms_ds):
actor = {"id": "user", "_r": {"d": {"perms_ds_one": ["vt"]}}}
allowed_tables = await perms_ds.allowed_resources(
"view-table", actor, parent="perms_ds_one"
)
page = await perms_ds.allowed_resources("view-table", actor, parent="perms_ds_one")
# Should return all tables in perms_ds_one
table_names = {r.child for r in allowed_tables}
table_names = {r.child for r in page.resources}
assert "t1" in table_names
assert "t2" in table_names
assert "v1" in table_names # views too
@ -1360,11 +1358,11 @@ async def test_actor_restrictions_global_level(perms_ds):
actor = {"id": "user", "_r": {"a": ["vt"]}}
allowed_tables = await perms_ds.allowed_resources("view-table", actor)
page = await perms_ds.allowed_resources("view-table", actor)
# Should return all tables in all databases
assert len(allowed_tables) > 0
dbs = {r.parent for r in allowed_tables}
assert len(page.resources) > 0
dbs = {r.parent for r in page.resources}
assert "perms_ds_one" in dbs
assert "perms_ds_two" in dbs
@ -1430,8 +1428,8 @@ async def test_actor_restrictions_view_instance_only(perms_ds):
data = response.json()
# The instance is visible but databases list should be empty or minimal
# Actually, let's check via allowed_resources
allowed_dbs = await perms_ds.allowed_resources("view-database", actor)
assert len(allowed_dbs) == 0
page = await perms_ds.allowed_resources("view-database", actor)
assert len(page.resources) == 0
@pytest.mark.asyncio
@ -1441,11 +1439,11 @@ async def test_actor_restrictions_empty_allowlist(perms_ds):
actor = {"id": "user", "_r": {}}
# No actions in allowlist, so everything should be denied
allowed_tables = await perms_ds.allowed_resources("view-table", actor)
assert len(allowed_tables) == 0
page1 = await perms_ds.allowed_resources("view-table", actor)
assert len(page1.resources) == 0
allowed_dbs = await perms_ds.allowed_resources("view-database", actor)
assert len(allowed_dbs) == 0
page2 = await perms_ds.allowed_resources("view-database", actor)
assert len(page2.resources) == 0
result = await perms_ds.allowed(action="view-instance", actor=actor)
assert result is False