Implement resource-based permission system with SQL-driven access control

This introduces a new hierarchical permission system that uses SQL queries
for efficient permission checking across resources. The system replaces the
older permission_allowed() pattern with a more flexible resource-based
approach.

Core changes:

- New Resource ABC and Action dataclass in datasette/permissions.py
  * Resources represent hierarchical entities (instance, database, table)
  * Each resource type implements resources_sql() to list all instances
  * Actions define operations on resources with cascading rules

- New plugin hook: register_actions(datasette)
  * Plugins register actions with their associated resource types
  * Replaces register_permissions() and register_resource_types()
  * See docs/plugin_hooks.rst for full documentation

- Three new Datasette methods for permission checks:
  * allowed_resources(action, actor) - returns list[Resource]
  * allowed_resources_with_reasons(action, actor) - for debugging
  * allowed(action, resource, actor) - checks single resource
  * All use SQL for filtering, never Python iteration

- New /-/tables endpoint (TablesView)
  * Returns JSON list of tables user can view
  * Supports ?q= parameter for regex filtering
  * Format: {"matches": [{"name": "db/table", "url": "/db/table"}]}
  * Respects all permission rules from configuration and plugins

- SQL-based permission evaluation (datasette/utils/actions_sql.py)
  * Cascading rules: child-level → parent-level → global-level
  * DENY beats ALLOW at same specificity
  * Uses CTEs for efficient SQL-only filtering
  * Combines permission_resources_sql() hook results

- Default actions in datasette/default_actions.py
  * InstanceResource, DatabaseResource, TableResource, QueryResource
  * Core actions: view-instance, view-database, view-table, etc.

- Fixed default_permissions.py to handle database-level allow blocks
  * Now creates parent-level rules for view-table action
  * Fixes: datasette ... -s databases.fixtures.allow.id root

Documentation:

- Comprehensive register_actions() hook documentation
- Detailed resources_sql() method explanation
- /-/tables endpoint documentation in docs/introspection.rst
- Deprecated register_permissions() with migration guide

Tests:

- tests/test_actions_sql.py: 7 tests for core permission API
- tests/test_tables_endpoint.py: 13 tests for /-/tables endpoint
- All 118 documentation tests pass
- Tests verify SQL does filtering (not Python)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Simon Willison 2025-10-20 15:59:37 -07:00
commit 7db754c284
14 changed files with 2185 additions and 2 deletions

View file

@ -52,6 +52,7 @@ from .views.special import (
AllowedResourcesView,
PermissionRulesView,
PermissionCheckView,
TablesView,
)
from .views.table import (
TableInsertView,
@ -308,6 +309,7 @@ class Datasette:
self.immutables = set(immutables or [])
self.databases = collections.OrderedDict()
self.permissions = {} # .invoke_startup() will populate this
self.actions = {} # .invoke_startup() will populate this
try:
self._refresh_schemas_lock = asyncio.Lock()
except RuntimeError as rex:
@ -589,6 +591,33 @@ class Datasette:
if p.abbr:
abbrs[p.abbr] = p
self.permissions[p.name] = p
# Register actions, but watch out for duplicate name/abbr
action_names = {}
action_abbrs = {}
for hook in pm.hook.register_actions(datasette=self):
if hook:
for action in hook:
if (
action.name in action_names
and action != action_names[action.name]
):
raise StartupError(
"Duplicate action name: {}".format(action.name)
)
if (
action.abbr
and action.abbr in action_abbrs
and action != action_abbrs[action.abbr]
):
raise StartupError(
"Duplicate action abbr: {}".format(action.abbr)
)
action_names[action.name] = action
if action.abbr:
action_abbrs[action.abbr] = action
self.actions[action.name] = action
for hook in pm.hook.prepare_jinja2_environment(
env=self._jinja_env, datasette=self
):
@ -1242,6 +1271,107 @@ class Datasette:
# It's visible to everyone
return True, False
async def allowed_resources(
self,
action: str,
actor: dict | None = None,
) -> list["Resource"]:
"""
Return all resources the actor can access for the given action.
Uses SQL to filter resources based on cascading permission rules.
Returns instances of the appropriate Resource subclass.
Example:
tables = await datasette.allowed_resources("view-table", actor)
for table in tables:
print(f"{table.parent}/{table.child}")
"""
from datasette.utils.actions_sql import build_allowed_resources_sql
from datasette.permissions import Resource
action_obj = self.actions.get(action)
if not action_obj:
raise ValueError(f"Unknown action: {action}")
query, params = await build_allowed_resources_sql(self, actor, action)
result = await self.get_internal_database().execute(query, params)
# Instantiate the appropriate Resource subclass for each row
resource_class = action_obj.resource_class
resources = []
for row in result.rows:
# row[0]=parent, row[1]=child, row[2]=reason (ignored)
# Create instance directly with parent/child from base class
resource = object.__new__(resource_class)
Resource.__init__(resource, parent=row[0], child=row[1])
resources.append(resource)
return resources
async def allowed_resources_with_reasons(
self,
action: str,
actor: dict | None = None,
) -> list["AllowedResource"]:
"""
Return allowed resources with permission reasons for debugging.
Uses SQL to filter resources and includes the reason each was allowed.
Returns list of AllowedResource named tuples with (resource, reason).
Example:
debug_info = await datasette.allowed_resources_with_reasons("view-table", actor)
for allowed in debug_info:
print(f"{allowed.resource}: {allowed.reason}")
"""
from datasette.utils.actions_sql import build_allowed_resources_sql
from datasette.permissions import AllowedResource, Resource
action_obj = self.actions.get(action)
if not action_obj:
raise ValueError(f"Unknown action: {action}")
query, params = await build_allowed_resources_sql(self, actor, action)
result = await self.get_internal_database().execute(query, params)
resource_class = action_obj.resource_class
resources = []
for row in result.rows:
# Create instance directly with parent/child from base class
resource = object.__new__(resource_class)
Resource.__init__(resource, parent=row[0], child=row[1])
reason = row[2]
resources.append(AllowedResource(resource=resource, reason=reason))
return resources
async def allowed(
self,
action: str,
resource: "Resource",
actor: dict | None = None,
) -> bool:
"""
Check if actor can perform action on specific resource.
Uses SQL to check permission for a single resource without fetching all resources.
This is efficient - it does NOT call allowed_resources() and check membership.
Example:
from datasette.default_actions import TableResource
can_view = await datasette.allowed(
"view-table",
TableResource(database="analytics", table="users"),
actor
)
"""
from datasette.utils.actions_sql import check_permission_for_resource
return await check_permission_for_resource(
self, actor, action, resource.parent, resource.child
)
async def execute(
self,
db_name,
@ -1726,6 +1856,10 @@ class Datasette:
ApiExplorerView.as_view(self),
r"/-/api$",
)
add_route(
TablesView.as_view(self),
r"/-/tables$",
)
add_route(
LogoutView.as_view(self),
r"/-/logout$",

View file

@ -0,0 +1,189 @@
from datasette import hookimpl
from datasette.permissions import Action, Resource
from typing import Optional
class InstanceResource(Resource):
"""The Datasette instance itself."""
name = "instance"
parent_name = None
def __init__(self):
super().__init__(parent=None, child=None)
@classmethod
def resources_sql(cls) -> str:
return "SELECT NULL AS parent, NULL AS child"
class DatabaseResource(Resource):
"""A database in Datasette."""
name = "database"
parent_name = "instance"
def __init__(self, database: str):
super().__init__(parent=database, child=None)
@classmethod
def resources_sql(cls) -> str:
return """
SELECT database_name AS parent, NULL AS child
FROM catalog_databases
"""
class TableResource(Resource):
"""A table in a database."""
name = "table"
parent_name = "database"
def __init__(self, database: str, table: str):
super().__init__(parent=database, child=table)
@classmethod
def resources_sql(cls) -> str:
return """
SELECT database_name AS parent, table_name AS child
FROM catalog_tables
"""
class QueryResource(Resource):
"""A canned query in a database."""
name = "query"
parent_name = "database"
def __init__(self, database: str, query: str):
super().__init__(parent=database, child=query)
@classmethod
def resources_sql(cls) -> str:
# TODO: Need catalog for queries
return "SELECT NULL AS parent, NULL AS child WHERE 0"
@hookimpl
def register_actions():
"""Register the core Datasette actions."""
return (
# View actions
Action(
name="view-instance",
abbr="vi",
description="View Datasette instance",
takes_parent=False,
takes_child=False,
resource_class=InstanceResource,
),
Action(
name="view-database",
abbr="vd",
description="View database",
takes_parent=True,
takes_child=False,
resource_class=DatabaseResource,
),
Action(
name="view-database-download",
abbr="vdd",
description="Download database file",
takes_parent=True,
takes_child=False,
resource_class=DatabaseResource,
),
Action(
name="view-table",
abbr="vt",
description="View table",
takes_parent=True,
takes_child=True,
resource_class=TableResource,
),
Action(
name="view-query",
abbr="vq",
description="View named query results",
takes_parent=True,
takes_child=True,
resource_class=QueryResource,
),
Action(
name="execute-sql",
abbr="es",
description="Execute read-only SQL queries",
takes_parent=True,
takes_child=False,
resource_class=DatabaseResource,
),
# Debug actions
Action(
name="permissions-debug",
abbr="pd",
description="Access permission debug tool",
takes_parent=False,
takes_child=False,
resource_class=InstanceResource,
),
Action(
name="debug-menu",
abbr="dm",
description="View debug menu items",
takes_parent=False,
takes_child=False,
resource_class=InstanceResource,
),
# Write actions on tables
Action(
name="insert-row",
abbr="ir",
description="Insert rows",
takes_parent=True,
takes_child=True,
resource_class=TableResource,
),
Action(
name="delete-row",
abbr="dr",
description="Delete rows",
takes_parent=True,
takes_child=True,
resource_class=TableResource,
),
Action(
name="update-row",
abbr="ur",
description="Update rows",
takes_parent=True,
takes_child=True,
resource_class=TableResource,
),
Action(
name="alter-table",
abbr="at",
description="Alter tables",
takes_parent=True,
takes_child=True,
resource_class=TableResource,
),
Action(
name="drop-table",
abbr="dt",
description="Drop tables",
takes_parent=True,
takes_child=True,
resource_class=TableResource,
),
# Schema actions on databases
Action(
name="create-table",
abbr="ct",
description="Create tables",
takes_parent=True,
takes_child=False,
resource_class=DatabaseResource,
),
)

View file

@ -289,6 +289,13 @@ async def _config_permission_rules(datasette, actor, action) -> list[PluginSQL]:
db_allow_sql = db_config.get("allow_sql")
add_row(db_name, None, evaluate(db_allow_sql), f"allow_sql for {db_name}")
if action == "view-table":
# Database-level allow block affects all tables in that database
db_allow = db_config.get("allow")
add_row(
db_name, None, evaluate(db_allow), f"allow for {action} on {db_name}"
)
if action == "view-instance":
allow_block = config.get("allow")
add_row(None, None, evaluate(allow_block), "allow for view-instance")
@ -325,7 +332,6 @@ async def _config_permission_rules(datasette, actor, action) -> list[PluginSQL]:
params[f"{key}_reason"] = reason
sql = "\nUNION ALL\n".join(parts)
print(sql, params)
return [PluginSQL(source="config_permissions", sql=sql, params=params)]

View file

@ -74,6 +74,11 @@ def register_permissions(datasette):
"""Register permissions: returns a list of datasette.permission.Permission named tuples"""
@hookspec
def register_actions(datasette):
"""Register actions: returns a list of datasette.permission.Action objects"""
@hookspec
def register_routes(datasette):
"""Register URL routes: return a list of (regex, view_function) pairs"""

View file

@ -1,7 +1,92 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
from typing import Optional, NamedTuple
class Resource(ABC):
"""
Base class for all resource types.
Each subclass represents a type of resource (e.g., TableResource, DatabaseResource).
The class itself carries metadata about the resource type.
Instances represent specific resources.
"""
# Class-level metadata (subclasses must define these)
name: str = None # e.g., "table", "database", "model"
parent_name: Optional[str] = None # e.g., "database" for tables
def __init__(self, parent: Optional[str] = None, child: Optional[str] = None):
"""
Create a resource instance.
Args:
parent: The parent identifier (meaning depends on resource type)
child: The child identifier (meaning depends on resource type)
"""
self.parent = parent
self.child = child
@classmethod
@abstractmethod
def resources_sql(cls) -> str:
"""
Return SQL query that returns all resources of this type.
Must return two columns: parent, child
"""
pass
def __str__(self) -> str:
if self.parent is None and self.child is None:
return f"{self.name}:*"
elif self.child is None:
return f"{self.name}:{self.parent}"
else:
return f"{self.name}:{self.parent}/{self.child}"
def __repr__(self) -> str:
parts = [f"{self.__class__.__name__}("]
args = []
if self.parent:
args.append(f"{self.parent!r}")
if self.child:
args.append(f"{self.child!r}")
parts.append(", ".join(args))
parts.append(")")
return "".join(parts)
def __eq__(self, other):
if not isinstance(other, Resource):
return False
return (
self.__class__ == other.__class__
and self.parent == other.parent
and self.child == other.child
)
def __hash__(self):
return hash((self.__class__, self.parent, self.child))
class AllowedResource(NamedTuple):
"""A resource with the reason it was allowed (for debugging)."""
resource: Resource
reason: str
@dataclass(frozen=True)
class Action:
name: str
abbr: str | None
description: str | None
takes_parent: bool
takes_child: bool
resource_class: type[Resource]
# This is obsolete, replaced by Action and ResourceType
@dataclass
class Permission:
name: str

View file

@ -23,6 +23,7 @@ DEFAULT_PLUGINS = (
"datasette.sql_functions",
"datasette.actor_auth_cookie",
"datasette.default_permissions",
"datasette.default_actions",
"datasette.default_magic_parameters",
"datasette.blob_renderer",
"datasette.default_menu_links",

View file

@ -0,0 +1,401 @@
class NavigationSearch extends HTMLElement {
constructor() {
super();
this.attachShadow({ mode: 'open' });
this.selectedIndex = -1;
this.matches = [];
this.debounceTimer = null;
this.render();
this.setupEventListeners();
}
render() {
this.shadowRoot.innerHTML = `
<style>
:host {
display: contents;
}
dialog {
border: none;
border-radius: 0.75rem;
padding: 0;
max-width: 90vw;
width: 600px;
max-height: 80vh;
box-shadow: 0 20px 25px -5px rgba(0, 0, 0, 0.1), 0 10px 10px -5px rgba(0, 0, 0, 0.04);
animation: slideIn 0.2s ease-out;
}
dialog::backdrop {
background: rgba(0, 0, 0, 0.5);
backdrop-filter: blur(4px);
animation: fadeIn 0.2s ease-out;
}
@keyframes slideIn {
from {
opacity: 0;
transform: translateY(-20px) scale(0.95);
}
to {
opacity: 1;
transform: translateY(0) scale(1);
}
}
@keyframes fadeIn {
from { opacity: 0; }
to { opacity: 1; }
}
.search-container {
display: flex;
flex-direction: column;
height: 100%;
}
.search-input-wrapper {
padding: 1.25rem;
border-bottom: 1px solid #e5e7eb;
}
.search-input {
width: 100%;
padding: 0.75rem 1rem;
font-size: 1rem;
border: 2px solid #e5e7eb;
border-radius: 0.5rem;
outline: none;
transition: border-color 0.2s;
box-sizing: border-box;
}
.search-input:focus {
border-color: #2563eb;
}
.results-container {
overflow-y: auto;
height: calc(80vh - 180px);
padding: 0.5rem;
}
.result-item {
padding: 0.875rem 1rem;
cursor: pointer;
border-radius: 0.5rem;
transition: background-color 0.15s;
display: flex;
align-items: center;
gap: 0.75rem;
}
.result-item:hover {
background-color: #f3f4f6;
}
.result-item.selected {
background-color: #dbeafe;
}
.result-name {
font-weight: 500;
color: #111827;
}
.result-url {
font-size: 0.875rem;
color: #6b7280;
}
.no-results {
padding: 2rem;
text-align: center;
color: #6b7280;
}
.hint-text {
padding: 0.75rem 1.25rem;
font-size: 0.875rem;
color: #6b7280;
border-top: 1px solid #e5e7eb;
display: flex;
gap: 1rem;
flex-wrap: wrap;
}
.hint-text kbd {
background: #f3f4f6;
padding: 0.125rem 0.375rem;
border-radius: 0.25rem;
font-size: 0.75rem;
border: 1px solid #d1d5db;
font-family: monospace;
}
/* Mobile optimizations */
@media (max-width: 640px) {
dialog {
width: 95vw;
max-height: 85vh;
border-radius: 0.5rem;
}
.search-input-wrapper {
padding: 1rem;
}
.search-input {
font-size: 16px; /* Prevents zoom on iOS */
}
.result-item {
padding: 1rem 0.75rem;
}
.hint-text {
font-size: 0.8rem;
padding: 0.5rem 1rem;
}
}
</style>
<dialog>
<div class="search-container">
<div class="search-input-wrapper">
<input
type="text"
class="search-input"
placeholder="Search..."
aria-label="Search navigation"
autocomplete="off"
spellcheck="false"
>
</div>
<div class="results-container" role="listbox"></div>
<div class="hint-text">
<span><kbd></kbd> <kbd></kbd> Navigate</span>
<span><kbd>Enter</kbd> Select</span>
<span><kbd>Esc</kbd> Close</span>
</div>
</div>
</dialog>
`;
}
setupEventListeners() {
const dialog = this.shadowRoot.querySelector('dialog');
const input = this.shadowRoot.querySelector('.search-input');
const resultsContainer = this.shadowRoot.querySelector('.results-container');
// Global keyboard listener for "/"
document.addEventListener('keydown', (e) => {
if (e.key === '/' && !this.isInputFocused() && !dialog.open) {
e.preventDefault();
this.openMenu();
}
});
// Input event
input.addEventListener('input', (e) => {
this.handleSearch(e.target.value);
});
// Keyboard navigation
input.addEventListener('keydown', (e) => {
if (e.key === 'ArrowDown') {
e.preventDefault();
this.moveSelection(1);
} else if (e.key === 'ArrowUp') {
e.preventDefault();
this.moveSelection(-1);
} else if (e.key === 'Enter') {
e.preventDefault();
this.selectCurrentItem();
} else if (e.key === 'Escape') {
this.closeMenu();
}
});
// Click on result item
resultsContainer.addEventListener('click', (e) => {
const item = e.target.closest('.result-item');
if (item) {
const index = parseInt(item.dataset.index);
this.selectItem(index);
}
});
// Close on backdrop click
dialog.addEventListener('click', (e) => {
if (e.target === dialog) {
this.closeMenu();
}
});
// Initial load
this.loadInitialData();
}
isInputFocused() {
const activeElement = document.activeElement;
return activeElement && (
activeElement.tagName === 'INPUT' ||
activeElement.tagName === 'TEXTAREA' ||
activeElement.isContentEditable
);
}
loadInitialData() {
const itemsAttr = this.getAttribute('items');
if (itemsAttr) {
try {
this.allItems = JSON.parse(itemsAttr);
this.matches = this.allItems;
} catch (e) {
console.error('Failed to parse items attribute:', e);
this.allItems = [];
this.matches = [];
}
}
}
handleSearch(query) {
clearTimeout(this.debounceTimer);
this.debounceTimer = setTimeout(() => {
const url = this.getAttribute('url');
if (url) {
// Fetch from API
this.fetchResults(url, query);
} else {
// Filter local items
this.filterLocalItems(query);
}
}, 200);
}
async fetchResults(url, query) {
try {
const searchUrl = `${url}?q=${encodeURIComponent(query)}`;
const response = await fetch(searchUrl);
const data = await response.json();
this.matches = data.matches || [];
this.selectedIndex = this.matches.length > 0 ? 0 : -1;
this.renderResults();
} catch (e) {
console.error('Failed to fetch search results:', e);
this.matches = [];
this.renderResults();
}
}
filterLocalItems(query) {
if (!query.trim()) {
this.matches = [];
} else {
const lowerQuery = query.toLowerCase();
this.matches = (this.allItems || []).filter(item =>
item.name.toLowerCase().includes(lowerQuery) ||
item.url.toLowerCase().includes(lowerQuery)
);
}
this.selectedIndex = this.matches.length > 0 ? 0 : -1;
this.renderResults();
}
renderResults() {
const container = this.shadowRoot.querySelector('.results-container');
const input = this.shadowRoot.querySelector('.search-input');
if (this.matches.length === 0) {
const message = input.value.trim() ? 'No results found' : 'Start typing to search...';
container.innerHTML = `<div class="no-results">${message}</div>`;
return;
}
container.innerHTML = this.matches.map((match, index) => `
<div
class="result-item ${index === this.selectedIndex ? 'selected' : ''}"
data-index="${index}"
role="option"
aria-selected="${index === this.selectedIndex}"
>
<div>
<div class="result-name">${this.escapeHtml(match.name)}</div>
<div class="result-url">${this.escapeHtml(match.url)}</div>
</div>
</div>
`).join('');
// Scroll selected item into view
if (this.selectedIndex >= 0) {
const selectedItem = container.children[this.selectedIndex];
if (selectedItem) {
selectedItem.scrollIntoView({ block: 'nearest' });
}
}
}
moveSelection(direction) {
const newIndex = this.selectedIndex + direction;
if (newIndex >= 0 && newIndex < this.matches.length) {
this.selectedIndex = newIndex;
this.renderResults();
}
}
selectCurrentItem() {
if (this.selectedIndex >= 0 && this.selectedIndex < this.matches.length) {
this.selectItem(this.selectedIndex);
}
}
selectItem(index) {
const match = this.matches[index];
if (match) {
// Dispatch custom event
this.dispatchEvent(new CustomEvent('select', {
detail: match,
bubbles: true,
composed: true
}));
// Navigate to URL
window.location.href = match.url;
this.closeMenu();
}
}
openMenu() {
const dialog = this.shadowRoot.querySelector('dialog');
const input = this.shadowRoot.querySelector('.search-input');
dialog.showModal();
input.value = '';
input.focus();
// Reset state - start with no items shown
this.matches = [];
this.selectedIndex = -1;
this.renderResults();
}
closeMenu() {
const dialog = this.shadowRoot.querySelector('dialog');
dialog.close();
}
escapeHtml(text) {
const div = document.createElement('div');
div.textContent = text;
return div.innerHTML;
}
}
// Register the custom element
customElements.define('navigation-search', NavigationSearch);

View file

@ -72,5 +72,7 @@
{% endfor %}
{% if select_templates %}<!-- Templates considered: {{ select_templates|join(", ") }} -->{% endif %}
<script src="{{ urls.static('navigation-search.js') }}" defer></script>
<navigation-search url="/-/tables"></navigation-search>
</body>
</html>

View file

@ -0,0 +1,275 @@
"""
SQL query builder for hierarchical permission checking.
This module implements a cascading permission system based on the pattern
from the sqlite-permissions-poc. It builds SQL queries that:
1. Start with all resources of a given type (from resource_type.resources_sql())
2. Gather permission rules from plugins (via permission_resources_sql hook)
3. Apply cascading logic: child parent global
4. Apply DENY-beats-ALLOW at each level
The core pattern is:
- Resources are identified by (parent, child) tuples
- Rules are evaluated at three levels:
- child: exact match on (parent, child)
- parent: match on (parent, NULL)
- global: match on (NULL, NULL)
- At the same level, DENY (allow=0) beats ALLOW (allow=1)
- Across levels, child beats parent beats global
"""
from typing import Optional
from datasette.plugins import pm
from datasette.utils import await_me_maybe
from datasette.utils.permissions import PluginSQL
async def build_allowed_resources_sql(
datasette,
actor: dict | None,
action: str,
) -> tuple[str, dict]:
"""
Build a SQL query that returns all resources the actor can access for this action.
Args:
datasette: The Datasette instance
actor: The actor dict (or None for unauthenticated)
action: The action name (e.g., "view-table", "view-database")
Returns:
A tuple of (sql_query, params_dict)
The returned SQL query will have three columns:
- parent: The parent resource identifier (or NULL)
- child: The child resource identifier (or NULL)
- reason: The reason from the rule that granted access
Example:
For action="view-table", this might return:
SELECT parent, child, reason FROM ... WHERE is_allowed = 1
Results would be like:
('analytics', 'users', 'role-based: analysts can access analytics DB')
('analytics', 'events', 'role-based: analysts can access analytics DB')
('production', 'orders', 'business-exception: allow production.orders for carol')
"""
# Get the Action object
action_obj = datasette.actions.get(action)
if not action_obj:
raise ValueError(f"Unknown action: {action}")
# Get base resources SQL from the resource class
base_resources_sql = action_obj.resource_class.resources_sql()
# Get all permission rule fragments from plugins via the hook
rule_results = pm.hook.permission_resources_sql(
datasette=datasette,
actor=actor,
action=action,
)
# Combine rule fragments and collect parameters
all_params = {}
rule_sqls = []
for result in rule_results:
result = await await_me_maybe(result)
if result is None:
continue
if isinstance(result, list):
for plugin_sql in result:
if isinstance(plugin_sql, PluginSQL):
rule_sqls.append(plugin_sql.sql)
all_params.update(plugin_sql.params)
elif isinstance(result, PluginSQL):
rule_sqls.append(result.sql)
all_params.update(result.params)
# If no rules, return empty result (deny all)
if not rule_sqls:
return "SELECT NULL AS parent, NULL AS child WHERE 0", {}
# Build the cascading permission query
rules_union = " UNION ALL ".join(rule_sqls)
query = f"""
WITH
base AS (
{base_resources_sql}
),
all_rules AS (
{rules_union}
),
child_lvl AS (
SELECT b.parent, b.child,
MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny,
MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow,
MAX(CASE WHEN ar.allow = 0 THEN ar.reason ELSE NULL END) AS deny_reason,
MAX(CASE WHEN ar.allow = 1 THEN ar.reason ELSE NULL END) AS allow_reason
FROM base b
LEFT JOIN all_rules ar ON ar.parent = b.parent AND ar.child = b.child
GROUP BY b.parent, b.child
),
parent_lvl AS (
SELECT b.parent, b.child,
MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny,
MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow,
MAX(CASE WHEN ar.allow = 0 THEN ar.reason ELSE NULL END) AS deny_reason,
MAX(CASE WHEN ar.allow = 1 THEN ar.reason ELSE NULL END) AS allow_reason
FROM base b
LEFT JOIN all_rules ar ON ar.parent = b.parent AND ar.child IS NULL
GROUP BY b.parent, b.child
),
global_lvl AS (
SELECT b.parent, b.child,
MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny,
MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow,
MAX(CASE WHEN ar.allow = 0 THEN ar.reason ELSE NULL END) AS deny_reason,
MAX(CASE WHEN ar.allow = 1 THEN ar.reason ELSE NULL END) AS allow_reason
FROM base b
LEFT JOIN all_rules ar ON ar.parent IS NULL AND ar.child IS NULL
GROUP BY b.parent, b.child
),
decisions AS (
SELECT
b.parent, b.child,
CASE
WHEN cl.any_deny = 1 THEN 0
WHEN cl.any_allow = 1 THEN 1
WHEN pl.any_deny = 1 THEN 0
WHEN pl.any_allow = 1 THEN 1
WHEN gl.any_deny = 1 THEN 0
WHEN gl.any_allow = 1 THEN 1
ELSE 0
END AS is_allowed,
CASE
WHEN cl.any_deny = 1 THEN cl.deny_reason
WHEN cl.any_allow = 1 THEN cl.allow_reason
WHEN pl.any_deny = 1 THEN pl.deny_reason
WHEN pl.any_allow = 1 THEN pl.allow_reason
WHEN gl.any_deny = 1 THEN gl.deny_reason
WHEN gl.any_allow = 1 THEN gl.allow_reason
ELSE 'default deny'
END AS reason
FROM base b
JOIN child_lvl cl USING (parent, child)
JOIN parent_lvl pl USING (parent, child)
JOIN global_lvl gl USING (parent, child)
)
SELECT parent, child, reason
FROM decisions
WHERE is_allowed = 1
ORDER BY parent, child
"""
return query.strip(), all_params
async def check_permission_for_resource(
datasette,
actor: dict | None,
action: str,
parent: Optional[str],
child: Optional[str],
) -> bool:
"""
Check if an actor has permission for a specific action on a specific resource.
Args:
datasette: The Datasette instance
actor: The actor dict (or None)
action: The action name
parent: The parent resource identifier (e.g., database name, or None)
child: The child resource identifier (e.g., table name, or None)
Returns:
True if the actor is allowed, False otherwise
This builds the cascading permission query and checks if the specific
resource is in the allowed set.
"""
# Get the Action object
action_obj = datasette.actions.get(action)
if not action_obj:
raise ValueError(f"Unknown action: {action}")
# Get all permission rule fragments from plugins via the hook
rule_results = pm.hook.permission_resources_sql(
datasette=datasette,
actor=actor,
action=action,
)
# Combine rule fragments and collect parameters
all_params = {}
rule_sqls = []
for result in rule_results:
result = await await_me_maybe(result)
if result is None:
continue
if isinstance(result, list):
for plugin_sql in result:
if isinstance(plugin_sql, PluginSQL):
rule_sqls.append(plugin_sql.sql)
all_params.update(plugin_sql.params)
elif isinstance(result, PluginSQL):
rule_sqls.append(result.sql)
all_params.update(result.params)
# If no rules, default deny
if not rule_sqls:
return False
# Build a simplified query that just checks for this one resource
rules_union = " UNION ALL ".join(rule_sqls)
# Add parameters for the resource we're checking
all_params["_check_parent"] = parent
all_params["_check_child"] = child
query = f"""
WITH
all_rules AS (
{rules_union}
),
child_lvl AS (
SELECT
MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny,
MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow
FROM all_rules ar
WHERE ar.parent = :_check_parent AND ar.child = :_check_child
),
parent_lvl AS (
SELECT
MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny,
MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow
FROM all_rules ar
WHERE ar.parent = :_check_parent AND ar.child IS NULL
),
global_lvl AS (
SELECT
MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny,
MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow
FROM all_rules ar
WHERE ar.parent IS NULL AND ar.child IS NULL
)
SELECT
CASE
WHEN cl.any_deny = 1 THEN 0
WHEN cl.any_allow = 1 THEN 1
WHEN pl.any_deny = 1 THEN 0
WHEN pl.any_allow = 1 THEN 1
WHEN gl.any_deny = 1 THEN 0
WHEN gl.any_allow = 1 THEN 1
ELSE 0
END AS is_allowed
FROM child_lvl cl, parent_lvl pl, global_lvl gl
"""
# Execute the query against the internal database
result = await datasette.get_internal_database().execute(query, all_params)
if result.rows:
return bool(result.rows[0][0])
return False

View file

@ -923,3 +923,48 @@ class ApiExplorerView(BaseView):
"private": private,
},
)
class TablesView(BaseView):
"""
Simple endpoint that uses the new allowed_resources() API.
Returns JSON list of all tables the actor can view.
Supports ?q=foo+bar to filter tables matching .*foo.*bar.* pattern,
ordered by shortest name first.
"""
name = "tables"
has_json_alternate = False
async def get(self, request):
# Use the new allowed_resources() method
tables = await self.ds.allowed_resources("view-table", request.actor)
# Convert to list of matches with name and url
matches = [
{
"name": f"{table.parent}/{table.child}",
"url": self.ds.urls.table(table.parent, table.child),
}
for table in tables
]
# Apply search filter if q parameter is present
q = request.args.get("q", "").strip()
if q:
import re
# Split search terms by whitespace
terms = q.split()
# Build regex pattern: .*term1.*term2.*term3.*
pattern = ".*" + ".*".join(re.escape(term) for term in terms) + ".*"
regex = re.compile(pattern, re.IGNORECASE)
# Filter tables matching the pattern (extract table name from "db/table")
matches = [m for m in matches if regex.match(m["name"].split("/", 1)[1])]
# Sort by shortest table name first
matches.sort(key=lambda m: len(m["name"].split("/", 1)[1]))
return Response.json({"matches": matches})