diff --git a/datasette/app.py b/datasette/app.py index 6c7026a8..225d66e4 100644 --- a/datasette/app.py +++ b/datasette/app.py @@ -52,6 +52,7 @@ from .views.special import ( AllowedResourcesView, PermissionRulesView, PermissionCheckView, + TablesView, ) from .views.table import ( TableInsertView, @@ -308,6 +309,7 @@ class Datasette: self.immutables = set(immutables or []) self.databases = collections.OrderedDict() self.permissions = {} # .invoke_startup() will populate this + self.actions = {} # .invoke_startup() will populate this try: self._refresh_schemas_lock = asyncio.Lock() except RuntimeError as rex: @@ -589,6 +591,33 @@ class Datasette: if p.abbr: abbrs[p.abbr] = p self.permissions[p.name] = p + + # Register actions, but watch out for duplicate name/abbr + action_names = {} + action_abbrs = {} + for hook in pm.hook.register_actions(datasette=self): + if hook: + for action in hook: + if ( + action.name in action_names + and action != action_names[action.name] + ): + raise StartupError( + "Duplicate action name: {}".format(action.name) + ) + if ( + action.abbr + and action.abbr in action_abbrs + and action != action_abbrs[action.abbr] + ): + raise StartupError( + "Duplicate action abbr: {}".format(action.abbr) + ) + action_names[action.name] = action + if action.abbr: + action_abbrs[action.abbr] = action + self.actions[action.name] = action + for hook in pm.hook.prepare_jinja2_environment( env=self._jinja_env, datasette=self ): @@ -1242,6 +1271,107 @@ class Datasette: # It's visible to everyone return True, False + async def allowed_resources( + self, + action: str, + actor: dict | None = None, + ) -> list["Resource"]: + """ + Return all resources the actor can access for the given action. + + Uses SQL to filter resources based on cascading permission rules. + Returns instances of the appropriate Resource subclass. + + Example: + tables = await datasette.allowed_resources("view-table", actor) + for table in tables: + print(f"{table.parent}/{table.child}") + """ + from datasette.utils.actions_sql import build_allowed_resources_sql + from datasette.permissions import Resource + + action_obj = self.actions.get(action) + if not action_obj: + raise ValueError(f"Unknown action: {action}") + + query, params = await build_allowed_resources_sql(self, actor, action) + result = await self.get_internal_database().execute(query, params) + + # Instantiate the appropriate Resource subclass for each row + resource_class = action_obj.resource_class + resources = [] + for row in result.rows: + # row[0]=parent, row[1]=child, row[2]=reason (ignored) + # Create instance directly with parent/child from base class + resource = object.__new__(resource_class) + Resource.__init__(resource, parent=row[0], child=row[1]) + resources.append(resource) + + return resources + + async def allowed_resources_with_reasons( + self, + action: str, + actor: dict | None = None, + ) -> list["AllowedResource"]: + """ + Return allowed resources with permission reasons for debugging. + + Uses SQL to filter resources and includes the reason each was allowed. + Returns list of AllowedResource named tuples with (resource, reason). + + Example: + debug_info = await datasette.allowed_resources_with_reasons("view-table", actor) + for allowed in debug_info: + print(f"{allowed.resource}: {allowed.reason}") + """ + from datasette.utils.actions_sql import build_allowed_resources_sql + from datasette.permissions import AllowedResource, Resource + + action_obj = self.actions.get(action) + if not action_obj: + raise ValueError(f"Unknown action: {action}") + + query, params = await build_allowed_resources_sql(self, actor, action) + result = await self.get_internal_database().execute(query, params) + + resource_class = action_obj.resource_class + resources = [] + for row in result.rows: + # Create instance directly with parent/child from base class + resource = object.__new__(resource_class) + Resource.__init__(resource, parent=row[0], child=row[1]) + reason = row[2] + resources.append(AllowedResource(resource=resource, reason=reason)) + + return resources + + async def allowed( + self, + action: str, + resource: "Resource", + actor: dict | None = None, + ) -> bool: + """ + Check if actor can perform action on specific resource. + + Uses SQL to check permission for a single resource without fetching all resources. + This is efficient - it does NOT call allowed_resources() and check membership. + + Example: + from datasette.default_actions import TableResource + can_view = await datasette.allowed( + "view-table", + TableResource(database="analytics", table="users"), + actor + ) + """ + from datasette.utils.actions_sql import check_permission_for_resource + + return await check_permission_for_resource( + self, actor, action, resource.parent, resource.child + ) + async def execute( self, db_name, @@ -1726,6 +1856,10 @@ class Datasette: ApiExplorerView.as_view(self), r"/-/api$", ) + add_route( + TablesView.as_view(self), + r"/-/tables$", + ) add_route( LogoutView.as_view(self), r"/-/logout$", diff --git a/datasette/default_actions.py b/datasette/default_actions.py new file mode 100644 index 00000000..53916259 --- /dev/null +++ b/datasette/default_actions.py @@ -0,0 +1,189 @@ +from datasette import hookimpl +from datasette.permissions import Action, Resource +from typing import Optional + + +class InstanceResource(Resource): + """The Datasette instance itself.""" + + name = "instance" + parent_name = None + + def __init__(self): + super().__init__(parent=None, child=None) + + @classmethod + def resources_sql(cls) -> str: + return "SELECT NULL AS parent, NULL AS child" + + +class DatabaseResource(Resource): + """A database in Datasette.""" + + name = "database" + parent_name = "instance" + + def __init__(self, database: str): + super().__init__(parent=database, child=None) + + @classmethod + def resources_sql(cls) -> str: + return """ + SELECT database_name AS parent, NULL AS child + FROM catalog_databases + """ + + +class TableResource(Resource): + """A table in a database.""" + + name = "table" + parent_name = "database" + + def __init__(self, database: str, table: str): + super().__init__(parent=database, child=table) + + @classmethod + def resources_sql(cls) -> str: + return """ + SELECT database_name AS parent, table_name AS child + FROM catalog_tables + """ + + +class QueryResource(Resource): + """A canned query in a database.""" + + name = "query" + parent_name = "database" + + def __init__(self, database: str, query: str): + super().__init__(parent=database, child=query) + + @classmethod + def resources_sql(cls) -> str: + # TODO: Need catalog for queries + return "SELECT NULL AS parent, NULL AS child WHERE 0" + + +@hookimpl +def register_actions(): + """Register the core Datasette actions.""" + return ( + # View actions + Action( + name="view-instance", + abbr="vi", + description="View Datasette instance", + takes_parent=False, + takes_child=False, + resource_class=InstanceResource, + ), + Action( + name="view-database", + abbr="vd", + description="View database", + takes_parent=True, + takes_child=False, + resource_class=DatabaseResource, + ), + Action( + name="view-database-download", + abbr="vdd", + description="Download database file", + takes_parent=True, + takes_child=False, + resource_class=DatabaseResource, + ), + Action( + name="view-table", + abbr="vt", + description="View table", + takes_parent=True, + takes_child=True, + resource_class=TableResource, + ), + Action( + name="view-query", + abbr="vq", + description="View named query results", + takes_parent=True, + takes_child=True, + resource_class=QueryResource, + ), + Action( + name="execute-sql", + abbr="es", + description="Execute read-only SQL queries", + takes_parent=True, + takes_child=False, + resource_class=DatabaseResource, + ), + # Debug actions + Action( + name="permissions-debug", + abbr="pd", + description="Access permission debug tool", + takes_parent=False, + takes_child=False, + resource_class=InstanceResource, + ), + Action( + name="debug-menu", + abbr="dm", + description="View debug menu items", + takes_parent=False, + takes_child=False, + resource_class=InstanceResource, + ), + # Write actions on tables + Action( + name="insert-row", + abbr="ir", + description="Insert rows", + takes_parent=True, + takes_child=True, + resource_class=TableResource, + ), + Action( + name="delete-row", + abbr="dr", + description="Delete rows", + takes_parent=True, + takes_child=True, + resource_class=TableResource, + ), + Action( + name="update-row", + abbr="ur", + description="Update rows", + takes_parent=True, + takes_child=True, + resource_class=TableResource, + ), + Action( + name="alter-table", + abbr="at", + description="Alter tables", + takes_parent=True, + takes_child=True, + resource_class=TableResource, + ), + Action( + name="drop-table", + abbr="dt", + description="Drop tables", + takes_parent=True, + takes_child=True, + resource_class=TableResource, + ), + # Schema actions on databases + Action( + name="create-table", + abbr="ct", + description="Create tables", + takes_parent=True, + takes_child=False, + resource_class=DatabaseResource, + ), + ) diff --git a/datasette/default_permissions.py b/datasette/default_permissions.py index a9534cab..25bc9590 100644 --- a/datasette/default_permissions.py +++ b/datasette/default_permissions.py @@ -289,6 +289,13 @@ async def _config_permission_rules(datasette, actor, action) -> list[PluginSQL]: db_allow_sql = db_config.get("allow_sql") add_row(db_name, None, evaluate(db_allow_sql), f"allow_sql for {db_name}") + if action == "view-table": + # Database-level allow block affects all tables in that database + db_allow = db_config.get("allow") + add_row( + db_name, None, evaluate(db_allow), f"allow for {action} on {db_name}" + ) + if action == "view-instance": allow_block = config.get("allow") add_row(None, None, evaluate(allow_block), "allow for view-instance") @@ -325,7 +332,6 @@ async def _config_permission_rules(datasette, actor, action) -> list[PluginSQL]: params[f"{key}_reason"] = reason sql = "\nUNION ALL\n".join(parts) - print(sql, params) return [PluginSQL(source="config_permissions", sql=sql, params=params)] diff --git a/datasette/hookspecs.py b/datasette/hookspecs.py index eedb2481..35c4062d 100644 --- a/datasette/hookspecs.py +++ b/datasette/hookspecs.py @@ -74,6 +74,11 @@ def register_permissions(datasette): """Register permissions: returns a list of datasette.permission.Permission named tuples""" +@hookspec +def register_actions(datasette): + """Register actions: returns a list of datasette.permission.Action objects""" + + @hookspec def register_routes(datasette): """Register URL routes: return a list of (regex, view_function) pairs""" diff --git a/datasette/permissions.py b/datasette/permissions.py index bd42158e..f83780e6 100644 --- a/datasette/permissions.py +++ b/datasette/permissions.py @@ -1,7 +1,92 @@ +from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Optional +from typing import Optional, NamedTuple +class Resource(ABC): + """ + Base class for all resource types. + + Each subclass represents a type of resource (e.g., TableResource, DatabaseResource). + The class itself carries metadata about the resource type. + Instances represent specific resources. + """ + + # Class-level metadata (subclasses must define these) + name: str = None # e.g., "table", "database", "model" + parent_name: Optional[str] = None # e.g., "database" for tables + + def __init__(self, parent: Optional[str] = None, child: Optional[str] = None): + """ + Create a resource instance. + + Args: + parent: The parent identifier (meaning depends on resource type) + child: The child identifier (meaning depends on resource type) + """ + self.parent = parent + self.child = child + + @classmethod + @abstractmethod + def resources_sql(cls) -> str: + """ + Return SQL query that returns all resources of this type. + + Must return two columns: parent, child + """ + pass + + def __str__(self) -> str: + if self.parent is None and self.child is None: + return f"{self.name}:*" + elif self.child is None: + return f"{self.name}:{self.parent}" + else: + return f"{self.name}:{self.parent}/{self.child}" + + def __repr__(self) -> str: + parts = [f"{self.__class__.__name__}("] + args = [] + if self.parent: + args.append(f"{self.parent!r}") + if self.child: + args.append(f"{self.child!r}") + parts.append(", ".join(args)) + parts.append(")") + return "".join(parts) + + def __eq__(self, other): + if not isinstance(other, Resource): + return False + return ( + self.__class__ == other.__class__ + and self.parent == other.parent + and self.child == other.child + ) + + def __hash__(self): + return hash((self.__class__, self.parent, self.child)) + + +class AllowedResource(NamedTuple): + """A resource with the reason it was allowed (for debugging).""" + + resource: Resource + reason: str + + +@dataclass(frozen=True) +class Action: + name: str + abbr: str | None + description: str | None + takes_parent: bool + takes_child: bool + resource_class: type[Resource] + + +# This is obsolete, replaced by Action and ResourceType @dataclass class Permission: name: str diff --git a/datasette/plugins.py b/datasette/plugins.py index 3769a209..288c536b 100644 --- a/datasette/plugins.py +++ b/datasette/plugins.py @@ -23,6 +23,7 @@ DEFAULT_PLUGINS = ( "datasette.sql_functions", "datasette.actor_auth_cookie", "datasette.default_permissions", + "datasette.default_actions", "datasette.default_magic_parameters", "datasette.blob_renderer", "datasette.default_menu_links", diff --git a/datasette/static/navigation-search.js b/datasette/static/navigation-search.js new file mode 100644 index 00000000..202839d5 --- /dev/null +++ b/datasette/static/navigation-search.js @@ -0,0 +1,401 @@ +class NavigationSearch extends HTMLElement { + constructor() { + super(); + this.attachShadow({ mode: 'open' }); + this.selectedIndex = -1; + this.matches = []; + this.debounceTimer = null; + + this.render(); + this.setupEventListeners(); + } + + render() { + this.shadowRoot.innerHTML = ` + + + +
+
+ +
+
+
+ Navigate + Enter Select + Esc Close +
+
+
+ `; + } + + setupEventListeners() { + const dialog = this.shadowRoot.querySelector('dialog'); + const input = this.shadowRoot.querySelector('.search-input'); + const resultsContainer = this.shadowRoot.querySelector('.results-container'); + + // Global keyboard listener for "/" + document.addEventListener('keydown', (e) => { + if (e.key === '/' && !this.isInputFocused() && !dialog.open) { + e.preventDefault(); + this.openMenu(); + } + }); + + // Input event + input.addEventListener('input', (e) => { + this.handleSearch(e.target.value); + }); + + // Keyboard navigation + input.addEventListener('keydown', (e) => { + if (e.key === 'ArrowDown') { + e.preventDefault(); + this.moveSelection(1); + } else if (e.key === 'ArrowUp') { + e.preventDefault(); + this.moveSelection(-1); + } else if (e.key === 'Enter') { + e.preventDefault(); + this.selectCurrentItem(); + } else if (e.key === 'Escape') { + this.closeMenu(); + } + }); + + // Click on result item + resultsContainer.addEventListener('click', (e) => { + const item = e.target.closest('.result-item'); + if (item) { + const index = parseInt(item.dataset.index); + this.selectItem(index); + } + }); + + // Close on backdrop click + dialog.addEventListener('click', (e) => { + if (e.target === dialog) { + this.closeMenu(); + } + }); + + // Initial load + this.loadInitialData(); + } + + isInputFocused() { + const activeElement = document.activeElement; + return activeElement && ( + activeElement.tagName === 'INPUT' || + activeElement.tagName === 'TEXTAREA' || + activeElement.isContentEditable + ); + } + + loadInitialData() { + const itemsAttr = this.getAttribute('items'); + if (itemsAttr) { + try { + this.allItems = JSON.parse(itemsAttr); + this.matches = this.allItems; + } catch (e) { + console.error('Failed to parse items attribute:', e); + this.allItems = []; + this.matches = []; + } + } + } + + handleSearch(query) { + clearTimeout(this.debounceTimer); + + this.debounceTimer = setTimeout(() => { + const url = this.getAttribute('url'); + + if (url) { + // Fetch from API + this.fetchResults(url, query); + } else { + // Filter local items + this.filterLocalItems(query); + } + }, 200); + } + + async fetchResults(url, query) { + try { + const searchUrl = `${url}?q=${encodeURIComponent(query)}`; + const response = await fetch(searchUrl); + const data = await response.json(); + this.matches = data.matches || []; + this.selectedIndex = this.matches.length > 0 ? 0 : -1; + this.renderResults(); + } catch (e) { + console.error('Failed to fetch search results:', e); + this.matches = []; + this.renderResults(); + } + } + + filterLocalItems(query) { + if (!query.trim()) { + this.matches = []; + } else { + const lowerQuery = query.toLowerCase(); + this.matches = (this.allItems || []).filter(item => + item.name.toLowerCase().includes(lowerQuery) || + item.url.toLowerCase().includes(lowerQuery) + ); + } + this.selectedIndex = this.matches.length > 0 ? 0 : -1; + this.renderResults(); + } + + renderResults() { + const container = this.shadowRoot.querySelector('.results-container'); + const input = this.shadowRoot.querySelector('.search-input'); + + if (this.matches.length === 0) { + const message = input.value.trim() ? 'No results found' : 'Start typing to search...'; + container.innerHTML = `
${message}
`; + return; + } + + container.innerHTML = this.matches.map((match, index) => ` +
+
+
${this.escapeHtml(match.name)}
+
${this.escapeHtml(match.url)}
+
+
+ `).join(''); + + // Scroll selected item into view + if (this.selectedIndex >= 0) { + const selectedItem = container.children[this.selectedIndex]; + if (selectedItem) { + selectedItem.scrollIntoView({ block: 'nearest' }); + } + } + } + + moveSelection(direction) { + const newIndex = this.selectedIndex + direction; + if (newIndex >= 0 && newIndex < this.matches.length) { + this.selectedIndex = newIndex; + this.renderResults(); + } + } + + selectCurrentItem() { + if (this.selectedIndex >= 0 && this.selectedIndex < this.matches.length) { + this.selectItem(this.selectedIndex); + } + } + + selectItem(index) { + const match = this.matches[index]; + if (match) { + // Dispatch custom event + this.dispatchEvent(new CustomEvent('select', { + detail: match, + bubbles: true, + composed: true + })); + + // Navigate to URL + window.location.href = match.url; + + this.closeMenu(); + } + } + + openMenu() { + const dialog = this.shadowRoot.querySelector('dialog'); + const input = this.shadowRoot.querySelector('.search-input'); + + dialog.showModal(); + input.value = ''; + input.focus(); + + // Reset state - start with no items shown + this.matches = []; + this.selectedIndex = -1; + this.renderResults(); + } + + closeMenu() { + const dialog = this.shadowRoot.querySelector('dialog'); + dialog.close(); + } + + escapeHtml(text) { + const div = document.createElement('div'); + div.textContent = text; + return div.innerHTML; + } +} + +// Register the custom element +customElements.define('navigation-search', NavigationSearch); \ No newline at end of file diff --git a/datasette/templates/base.html b/datasette/templates/base.html index 0b2def5a..0d89e11c 100644 --- a/datasette/templates/base.html +++ b/datasette/templates/base.html @@ -72,5 +72,7 @@ {% endfor %} {% if select_templates %}{% endif %} + + diff --git a/datasette/utils/actions_sql.py b/datasette/utils/actions_sql.py new file mode 100644 index 00000000..4dda404b --- /dev/null +++ b/datasette/utils/actions_sql.py @@ -0,0 +1,275 @@ +""" +SQL query builder for hierarchical permission checking. + +This module implements a cascading permission system based on the pattern +from the sqlite-permissions-poc. It builds SQL queries that: + +1. Start with all resources of a given type (from resource_type.resources_sql()) +2. Gather permission rules from plugins (via permission_resources_sql hook) +3. Apply cascading logic: child → parent → global +4. Apply DENY-beats-ALLOW at each level + +The core pattern is: +- Resources are identified by (parent, child) tuples +- Rules are evaluated at three levels: + - child: exact match on (parent, child) + - parent: match on (parent, NULL) + - global: match on (NULL, NULL) +- At the same level, DENY (allow=0) beats ALLOW (allow=1) +- Across levels, child beats parent beats global +""" + +from typing import Optional +from datasette.plugins import pm +from datasette.utils import await_me_maybe +from datasette.utils.permissions import PluginSQL + + +async def build_allowed_resources_sql( + datasette, + actor: dict | None, + action: str, +) -> tuple[str, dict]: + """ + Build a SQL query that returns all resources the actor can access for this action. + + Args: + datasette: The Datasette instance + actor: The actor dict (or None for unauthenticated) + action: The action name (e.g., "view-table", "view-database") + + Returns: + A tuple of (sql_query, params_dict) + + The returned SQL query will have three columns: + - parent: The parent resource identifier (or NULL) + - child: The child resource identifier (or NULL) + - reason: The reason from the rule that granted access + + Example: + For action="view-table", this might return: + SELECT parent, child, reason FROM ... WHERE is_allowed = 1 + + Results would be like: + ('analytics', 'users', 'role-based: analysts can access analytics DB') + ('analytics', 'events', 'role-based: analysts can access analytics DB') + ('production', 'orders', 'business-exception: allow production.orders for carol') + """ + # Get the Action object + action_obj = datasette.actions.get(action) + if not action_obj: + raise ValueError(f"Unknown action: {action}") + + # Get base resources SQL from the resource class + base_resources_sql = action_obj.resource_class.resources_sql() + + # Get all permission rule fragments from plugins via the hook + rule_results = pm.hook.permission_resources_sql( + datasette=datasette, + actor=actor, + action=action, + ) + + # Combine rule fragments and collect parameters + all_params = {} + rule_sqls = [] + + for result in rule_results: + result = await await_me_maybe(result) + if result is None: + continue + if isinstance(result, list): + for plugin_sql in result: + if isinstance(plugin_sql, PluginSQL): + rule_sqls.append(plugin_sql.sql) + all_params.update(plugin_sql.params) + elif isinstance(result, PluginSQL): + rule_sqls.append(result.sql) + all_params.update(result.params) + + # If no rules, return empty result (deny all) + if not rule_sqls: + return "SELECT NULL AS parent, NULL AS child WHERE 0", {} + + # Build the cascading permission query + rules_union = " UNION ALL ".join(rule_sqls) + + query = f""" +WITH +base AS ( + {base_resources_sql} +), +all_rules AS ( + {rules_union} +), +child_lvl AS ( + SELECT b.parent, b.child, + MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny, + MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow, + MAX(CASE WHEN ar.allow = 0 THEN ar.reason ELSE NULL END) AS deny_reason, + MAX(CASE WHEN ar.allow = 1 THEN ar.reason ELSE NULL END) AS allow_reason + FROM base b + LEFT JOIN all_rules ar ON ar.parent = b.parent AND ar.child = b.child + GROUP BY b.parent, b.child +), +parent_lvl AS ( + SELECT b.parent, b.child, + MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny, + MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow, + MAX(CASE WHEN ar.allow = 0 THEN ar.reason ELSE NULL END) AS deny_reason, + MAX(CASE WHEN ar.allow = 1 THEN ar.reason ELSE NULL END) AS allow_reason + FROM base b + LEFT JOIN all_rules ar ON ar.parent = b.parent AND ar.child IS NULL + GROUP BY b.parent, b.child +), +global_lvl AS ( + SELECT b.parent, b.child, + MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny, + MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow, + MAX(CASE WHEN ar.allow = 0 THEN ar.reason ELSE NULL END) AS deny_reason, + MAX(CASE WHEN ar.allow = 1 THEN ar.reason ELSE NULL END) AS allow_reason + FROM base b + LEFT JOIN all_rules ar ON ar.parent IS NULL AND ar.child IS NULL + GROUP BY b.parent, b.child +), +decisions AS ( + SELECT + b.parent, b.child, + CASE + WHEN cl.any_deny = 1 THEN 0 + WHEN cl.any_allow = 1 THEN 1 + WHEN pl.any_deny = 1 THEN 0 + WHEN pl.any_allow = 1 THEN 1 + WHEN gl.any_deny = 1 THEN 0 + WHEN gl.any_allow = 1 THEN 1 + ELSE 0 + END AS is_allowed, + CASE + WHEN cl.any_deny = 1 THEN cl.deny_reason + WHEN cl.any_allow = 1 THEN cl.allow_reason + WHEN pl.any_deny = 1 THEN pl.deny_reason + WHEN pl.any_allow = 1 THEN pl.allow_reason + WHEN gl.any_deny = 1 THEN gl.deny_reason + WHEN gl.any_allow = 1 THEN gl.allow_reason + ELSE 'default deny' + END AS reason + FROM base b + JOIN child_lvl cl USING (parent, child) + JOIN parent_lvl pl USING (parent, child) + JOIN global_lvl gl USING (parent, child) +) +SELECT parent, child, reason +FROM decisions +WHERE is_allowed = 1 +ORDER BY parent, child +""" + return query.strip(), all_params + + +async def check_permission_for_resource( + datasette, + actor: dict | None, + action: str, + parent: Optional[str], + child: Optional[str], +) -> bool: + """ + Check if an actor has permission for a specific action on a specific resource. + + Args: + datasette: The Datasette instance + actor: The actor dict (or None) + action: The action name + parent: The parent resource identifier (e.g., database name, or None) + child: The child resource identifier (e.g., table name, or None) + + Returns: + True if the actor is allowed, False otherwise + + This builds the cascading permission query and checks if the specific + resource is in the allowed set. + """ + # Get the Action object + action_obj = datasette.actions.get(action) + if not action_obj: + raise ValueError(f"Unknown action: {action}") + + # Get all permission rule fragments from plugins via the hook + rule_results = pm.hook.permission_resources_sql( + datasette=datasette, + actor=actor, + action=action, + ) + + # Combine rule fragments and collect parameters + all_params = {} + rule_sqls = [] + + for result in rule_results: + result = await await_me_maybe(result) + if result is None: + continue + if isinstance(result, list): + for plugin_sql in result: + if isinstance(plugin_sql, PluginSQL): + rule_sqls.append(plugin_sql.sql) + all_params.update(plugin_sql.params) + elif isinstance(result, PluginSQL): + rule_sqls.append(result.sql) + all_params.update(result.params) + + # If no rules, default deny + if not rule_sqls: + return False + + # Build a simplified query that just checks for this one resource + rules_union = " UNION ALL ".join(rule_sqls) + + # Add parameters for the resource we're checking + all_params["_check_parent"] = parent + all_params["_check_child"] = child + + query = f""" +WITH +all_rules AS ( + {rules_union} +), +child_lvl AS ( + SELECT + MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny, + MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow + FROM all_rules ar + WHERE ar.parent = :_check_parent AND ar.child = :_check_child +), +parent_lvl AS ( + SELECT + MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny, + MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow + FROM all_rules ar + WHERE ar.parent = :_check_parent AND ar.child IS NULL +), +global_lvl AS ( + SELECT + MAX(CASE WHEN ar.allow = 0 THEN 1 ELSE 0 END) AS any_deny, + MAX(CASE WHEN ar.allow = 1 THEN 1 ELSE 0 END) AS any_allow + FROM all_rules ar + WHERE ar.parent IS NULL AND ar.child IS NULL +) +SELECT + CASE + WHEN cl.any_deny = 1 THEN 0 + WHEN cl.any_allow = 1 THEN 1 + WHEN pl.any_deny = 1 THEN 0 + WHEN pl.any_allow = 1 THEN 1 + WHEN gl.any_deny = 1 THEN 0 + WHEN gl.any_allow = 1 THEN 1 + ELSE 0 + END AS is_allowed +FROM child_lvl cl, parent_lvl pl, global_lvl gl +""" + + # Execute the query against the internal database + result = await datasette.get_internal_database().execute(query, all_params) + if result.rows: + return bool(result.rows[0][0]) + return False diff --git a/datasette/views/special.py b/datasette/views/special.py index 7e5ce517..2c5004d0 100644 --- a/datasette/views/special.py +++ b/datasette/views/special.py @@ -923,3 +923,48 @@ class ApiExplorerView(BaseView): "private": private, }, ) + + +class TablesView(BaseView): + """ + Simple endpoint that uses the new allowed_resources() API. + Returns JSON list of all tables the actor can view. + + Supports ?q=foo+bar to filter tables matching .*foo.*bar.* pattern, + ordered by shortest name first. + """ + + name = "tables" + has_json_alternate = False + + async def get(self, request): + # Use the new allowed_resources() method + tables = await self.ds.allowed_resources("view-table", request.actor) + + # Convert to list of matches with name and url + matches = [ + { + "name": f"{table.parent}/{table.child}", + "url": self.ds.urls.table(table.parent, table.child), + } + for table in tables + ] + + # Apply search filter if q parameter is present + q = request.args.get("q", "").strip() + if q: + import re + + # Split search terms by whitespace + terms = q.split() + # Build regex pattern: .*term1.*term2.*term3.* + pattern = ".*" + ".*".join(re.escape(term) for term in terms) + ".*" + regex = re.compile(pattern, re.IGNORECASE) + + # Filter tables matching the pattern (extract table name from "db/table") + matches = [m for m in matches if regex.match(m["name"].split("/", 1)[1])] + + # Sort by shortest table name first + matches.sort(key=lambda m: len(m["name"].split("/", 1)[1])) + + return Response.json({"matches": matches}) diff --git a/docs/introspection.rst b/docs/introspection.rst index ff78ec78..19c6bffb 100644 --- a/docs/introspection.rst +++ b/docs/introspection.rst @@ -144,6 +144,47 @@ Shows currently attached databases. `Databases example `_: + +.. code-block:: json + + { + "matches": [ + { + "name": "fixtures/facetable", + "url": "/fixtures/facetable" + }, + { + "name": "fixtures/searchable", + "url": "/fixtures/searchable" + } + ] + } + +Search example with ``?q=facet`` returns only tables matching ``.*facet.*``: + +.. code-block:: json + + { + "matches": [ + { + "name": "fixtures/facetable", + "url": "/fixtures/facetable" + } + ] + } + +When multiple search terms are provided (e.g., ``?q=user+profile``), tables must match the pattern ``.*user.*profile.*``. Results are ordered by shortest table name first. + .. _JsonDataView_threads: /-/threads diff --git a/docs/plugin_hooks.rst b/docs/plugin_hooks.rst index 244f448d..66c78f7e 100644 --- a/docs/plugin_hooks.rst +++ b/docs/plugin_hooks.rst @@ -782,6 +782,9 @@ The plugin hook can then be used to register the new facet class like this: register_permissions(datasette) -------------------------------- +.. note:: + This hook is deprecated. Use :ref:`plugin_register_actions` instead, which provides a more flexible resource-based permission system. + If your plugin needs to register additional permissions unique to that plugin - ``upload-csvs`` for example - you can return a list of those permissions from this hook. .. code-block:: python @@ -824,6 +827,141 @@ The fields of the ``Permission`` class are as follows: This should only be ``True`` if you want anonymous users to be able to take this action. +.. _plugin_register_actions: + +register_actions(datasette) +---------------------------- + +If your plugin needs to register actions that can be checked with Datasette's new resource-based permission system, return a list of those actions from this hook. + +Actions define what operations can be performed on resources (like viewing a table, executing SQL, or custom plugin actions). + +.. code-block:: python + + from datasette import hookimpl + from datasette.permissions import Action, Resource + + + class DocumentCollectionResource(Resource): + """A collection of documents.""" + + name = "document-collection" + parent_name = None + + def __init__(self, collection: str): + super().__init__(parent=collection, child=None) + + @classmethod + def resources_sql(cls) -> str: + return """ + SELECT collection_name AS parent, NULL AS child + FROM document_collections + """ + + + class DocumentResource(Resource): + """A document in a collection.""" + + name = "document" + parent_name = "document-collection" + + def __init__(self, collection: str, document: str): + super().__init__(parent=collection, child=document) + + @classmethod + def resources_sql(cls) -> str: + return """ + SELECT collection_name AS parent, document_id AS child + FROM documents + """ + + + @hookimpl + def register_actions(datasette): + return [ + Action( + name="list-documents", + abbr="ld", + description="List documents in a collection", + takes_parent=True, + takes_child=False, + resource_class=DocumentCollectionResource, + ), + Action( + name="view-document", + abbr="vdoc", + description="View document", + takes_parent=True, + takes_child=True, + resource_class=DocumentResource, + ), + Action( + name="edit-document", + abbr="edoc", + description="Edit document", + takes_parent=True, + takes_child=True, + resource_class=DocumentResource, + ), + ] + +The fields of the ``Action`` dataclass are as follows: + +``name`` - string + The name of the action, e.g. ``view-document``. This should be unique across all plugins. + +``abbr`` - string or None + An abbreviation of the action, e.g. ``vdoc``. This is optional. Since this needs to be unique across all installed plugins it's best to choose carefully or use ``None``. + +``description`` - string or None + A human-readable description of what the action allows you to do. + +``takes_parent`` - boolean + ``True`` if this action requires a parent identifier (like a database name). + +``takes_child`` - boolean + ``True`` if this action requires a child identifier (like a table or document name). + +``resource_class`` - type[Resource] + The Resource subclass that defines what kind of resource this action applies to. Your Resource subclass must: + + - Define a ``name`` class attribute (e.g., ``"document"``) + - Optionally define a ``parent_name`` class attribute (e.g., ``"collection"``) + - Implement a ``resources_sql()`` classmethod that returns SQL returning all resources as ``(parent, child)`` columns + - Have an ``__init__`` method that accepts appropriate parameters and calls ``super().__init__(parent=..., child=...)`` + +The ``resources_sql()`` method +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The ``resources_sql()`` classmethod is crucial to Datasette's permission system. It returns a SQL query that lists all resources of that type that exist in the system. + +This SQL query is used by Datasette to efficiently check permissions across multiple resources at once. When a user requests a list of resources (like tables, documents, or other entities), Datasette uses this SQL to: + +1. Get all resources of this type from your data catalog +2. Combine it with permission rules from the ``permission_resources_sql`` hook +3. Use SQL joins and filtering to determine which resources the actor can access +4. Return only the permitted resources + +The SQL query **must** return exactly two columns: + +- ``parent`` - The parent identifier (e.g., database name, collection name), or ``NULL`` for top-level resources +- ``child`` - The child identifier (e.g., table name, document ID), or ``NULL`` for parent-only resources + +For example, if you're building a document management plugin with collections and documents stored in a ``documents`` table, your ``resources_sql()`` might look like: + +.. code-block:: python + + @classmethod + def resources_sql(cls) -> str: + return """ + SELECT collection_name AS parent, document_id AS child + FROM documents + """ + +This tells Datasette "here's how to find all documents in the system - look in the documents table and get the collection name and document ID for each one." + +The permission system then uses this query along with rules from plugins to determine which documents each user can access, all efficiently in SQL rather than loading everything into Python. + .. _plugin_asgi_wrapper: asgi_wrapper(datasette) diff --git a/tests/test_actions_sql.py b/tests/test_actions_sql.py new file mode 100644 index 00000000..8fc8803d --- /dev/null +++ b/tests/test_actions_sql.py @@ -0,0 +1,317 @@ +""" +Tests for the new Resource-based permission system. + +These tests verify: +1. The new Datasette.allowed_resources() method +2. The new Datasette.allowed() method +3. The new Datasette.allowed_resources_with_reasons() method +4. That SQL does the heavy lifting (no Python filtering) +""" + +import pytest +import pytest_asyncio +from datasette.app import Datasette +from datasette.plugins import pm +from datasette.utils.permissions import PluginSQL +from datasette.default_actions import TableResource +from datasette import hookimpl + + +# Test plugin that provides permission rules +class PermissionRulesPlugin: + def __init__(self, rules_callback): + self.rules_callback = rules_callback + + @hookimpl + def permission_resources_sql(self, datasette, actor, action): + """Return permission rules based on the callback""" + return self.rules_callback(datasette, actor, action) + + +@pytest_asyncio.fixture +async def test_ds(): + """Create a test Datasette instance with sample data""" + ds = Datasette() + await ds.invoke_startup() + + # Add test databases with some tables + db = ds.add_memory_database("analytics") + await db.execute_write("CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY)") + await db.execute_write("CREATE TABLE IF NOT EXISTS events (id INTEGER PRIMARY KEY)") + await db.execute_write( + "CREATE TABLE IF NOT EXISTS sensitive (id INTEGER PRIMARY KEY)" + ) + + db2 = ds.add_memory_database("production") + await db2.execute_write( + "CREATE TABLE IF NOT EXISTS customers (id INTEGER PRIMARY KEY)" + ) + await db2.execute_write( + "CREATE TABLE IF NOT EXISTS orders (id INTEGER PRIMARY KEY)" + ) + + # Refresh schemas to populate catalog_tables in internal database + await ds._refresh_schemas() + + return ds + + +@pytest.mark.asyncio +async def test_allowed_resources_global_allow(test_ds): + """Test allowed_resources() with a global allow rule""" + + def rules_callback(datasette, actor, action): + if actor and actor.get("id") == "alice": + sql = "SELECT NULL AS parent, NULL AS child, 1 AS allow, 'global: alice has access' AS reason" + return PluginSQL(source="test", sql=sql, params={}) + return None + + plugin = PermissionRulesPlugin(rules_callback) + pm.register(plugin, name="test_plugin") + + try: + # Use the new allowed_resources() method + tables = await test_ds.allowed_resources("view-table", {"id": "alice"}) + + # Alice should see all tables + assert len(tables) == 5 + assert all(isinstance(t, TableResource) for t in tables) + + # Check specific tables are present + table_set = set((t.parent, t.child) for t in tables) + assert ("analytics", "events") in table_set + assert ("analytics", "users") in table_set + assert ("analytics", "sensitive") in table_set + assert ("production", "customers") in table_set + assert ("production", "orders") in table_set + + finally: + pm.unregister(plugin, name="test_plugin") + + +@pytest.mark.asyncio +async def test_allowed_specific_resource(test_ds): + """Test allowed() method checks specific resource efficiently""" + + def rules_callback(datasette, actor, action): + if actor and actor.get("role") == "analyst": + # Allow analytics database, deny everything else (global deny) + sql = """ + SELECT NULL AS parent, NULL AS child, 0 AS allow, 'global deny' AS reason + UNION ALL + SELECT 'analytics' AS parent, NULL AS child, 1 AS allow, 'analyst access' AS reason + """ + return PluginSQL(source="test", sql=sql, params={}) + return None + + plugin = PermissionRulesPlugin(rules_callback) + pm.register(plugin, name="test_plugin") + + try: + actor = {"id": "bob", "role": "analyst"} + + # Check specific resources using allowed() + # This should use SQL WHERE clause, not fetch all resources + assert await test_ds.allowed( + "view-table", TableResource("analytics", "users"), actor + ) + assert await test_ds.allowed( + "view-table", TableResource("analytics", "events"), actor + ) + assert not await test_ds.allowed( + "view-table", TableResource("production", "orders"), actor + ) + + finally: + pm.unregister(plugin, name="test_plugin") + + +@pytest.mark.asyncio +async def test_allowed_resources_with_reasons(test_ds): + """Test allowed_resources_with_reasons() exposes debugging info""" + + def rules_callback(datasette, actor, action): + if actor and actor.get("role") == "analyst": + sql = """ + SELECT 'analytics' AS parent, NULL AS child, 1 AS allow, + 'parent: analyst access to analytics' AS reason + UNION ALL + SELECT 'analytics' AS parent, 'sensitive' AS child, 0 AS allow, + 'child: sensitive data denied' AS reason + """ + return PluginSQL(source="test", sql=sql, params={}) + return None + + plugin = PermissionRulesPlugin(rules_callback) + pm.register(plugin, name="test_plugin") + + try: + # Use allowed_resources_with_reasons to get debugging info + allowed = await test_ds.allowed_resources_with_reasons( + "view-table", {"id": "bob", "role": "analyst"} + ) + + # Should get analytics tables except sensitive + assert len(allowed) >= 2 # At least users and events + + # Check we can access both resource and reason + for item in allowed: + assert isinstance(item.resource, TableResource) + assert isinstance(item.reason, str) + if item.resource.parent == "analytics": + # Should mention parent-level reason + assert "analyst access" in item.reason.lower() + + finally: + pm.unregister(plugin, name="test_plugin") + + +@pytest.mark.asyncio +async def test_child_deny_overrides_parent_allow(test_ds): + """Test that child-level DENY beats parent-level ALLOW""" + + def rules_callback(datasette, actor, action): + if actor and actor.get("role") == "analyst": + sql = """ + SELECT 'analytics' AS parent, NULL AS child, 1 AS allow, + 'parent: allow analytics' AS reason + UNION ALL + SELECT 'analytics' AS parent, 'sensitive' AS child, 0 AS allow, + 'child: deny sensitive' AS reason + """ + return PluginSQL(source="test", sql=sql, params={}) + return None + + plugin = PermissionRulesPlugin(rules_callback) + pm.register(plugin, name="test_plugin") + + try: + actor = {"id": "bob", "role": "analyst"} + tables = await test_ds.allowed_resources("view-table", actor) + + # Should see analytics tables except sensitive + analytics_tables = [t for t in tables if t.parent == "analytics"] + assert len(analytics_tables) >= 2 + + table_names = {t.child for t in analytics_tables} + assert "users" in table_names + assert "events" in table_names + assert "sensitive" not in table_names + + # Verify with allowed() method + assert await test_ds.allowed( + "view-table", TableResource("analytics", "users"), actor + ) + assert not await test_ds.allowed( + "view-table", TableResource("analytics", "sensitive"), actor + ) + + finally: + pm.unregister(plugin, name="test_plugin") + + +@pytest.mark.asyncio +async def test_child_allow_overrides_parent_deny(test_ds): + """Test that child-level ALLOW beats parent-level DENY""" + + def rules_callback(datasette, actor, action): + if actor and actor.get("id") == "carol": + sql = """ + SELECT 'production' AS parent, NULL AS child, 0 AS allow, + 'parent: deny production' AS reason + UNION ALL + SELECT 'production' AS parent, 'orders' AS child, 1 AS allow, + 'child: carol can see orders' AS reason + """ + return PluginSQL(source="test", sql=sql, params={}) + return None + + plugin = PermissionRulesPlugin(rules_callback) + pm.register(plugin, name="test_plugin") + + try: + actor = {"id": "carol"} + tables = await test_ds.allowed_resources("view-table", actor) + + # Should only see production.orders + production_tables = [t for t in tables if t.parent == "production"] + assert len(production_tables) == 1 + assert production_tables[0].child == "orders" + + # Verify with allowed() method + assert await test_ds.allowed( + "view-table", TableResource("production", "orders"), actor + ) + assert not await test_ds.allowed( + "view-table", TableResource("production", "customers"), actor + ) + + finally: + pm.unregister(plugin, name="test_plugin") + + +@pytest.mark.asyncio +async def test_resource_equality_and_hashing(test_ds): + """Test that Resource instances support equality and hashing""" + + # Create some resources + r1 = TableResource("analytics", "users") + r2 = TableResource("analytics", "users") + r3 = TableResource("analytics", "events") + + # Test equality + assert r1 == r2 + assert r1 != r3 + + # Test they can be used in sets + resource_set = {r1, r2, r3} + assert len(resource_set) == 2 # r1 and r2 are the same + + # Test they can be used as dict keys + resource_dict = {r1: "data1", r3: "data2"} + assert resource_dict[r2] == "data1" # r2 same as r1 + + +@pytest.mark.asyncio +async def test_sql_does_filtering_not_python(test_ds): + """ + Verify that allowed() uses SQL WHERE clause, not Python filtering. + + This test doesn't actually verify the SQL itself (that would require + query introspection), but it demonstrates the API contract. + """ + + def rules_callback(datasette, actor, action): + # Deny everything by default, allow only analytics.users specifically + sql = """ + SELECT NULL AS parent, NULL AS child, 0 AS allow, + 'global deny' AS reason + UNION ALL + SELECT 'analytics' AS parent, 'users' AS child, 1 AS allow, + 'specific allow' AS reason + """ + return PluginSQL(source="test", sql=sql, params={}) + + plugin = PermissionRulesPlugin(rules_callback) + pm.register(plugin, name="test_plugin") + + try: + actor = {"id": "dave"} + + # allowed() should execute a targeted SQL query + # NOT fetch all resources and filter in Python + assert await test_ds.allowed( + "view-table", TableResource("analytics", "users"), actor + ) + assert not await test_ds.allowed( + "view-table", TableResource("analytics", "events"), actor + ) + + # allowed_resources() should also use SQL filtering + tables = await test_ds.allowed_resources("view-table", actor) + assert len(tables) == 1 + assert tables[0].parent == "analytics" + assert tables[0].child == "users" + + finally: + pm.unregister(plugin, name="test_plugin") diff --git a/tests/test_tables_endpoint.py b/tests/test_tables_endpoint.py new file mode 100644 index 00000000..a3305406 --- /dev/null +++ b/tests/test_tables_endpoint.py @@ -0,0 +1,544 @@ +""" +Tests for the /-/tables endpoint. + +These tests verify that the new TablesView correctly uses the allowed_resources() API. +""" + +import pytest +import pytest_asyncio +from datasette.app import Datasette +from datasette.plugins import pm +from datasette.utils.permissions import PluginSQL +from datasette import hookimpl + + +# Test plugin that provides permission rules +class PermissionRulesPlugin: + def __init__(self, rules_callback): + self.rules_callback = rules_callback + + @hookimpl + def permission_resources_sql(self, datasette, actor, action): + return self.rules_callback(datasette, actor, action) + + +@pytest_asyncio.fixture(scope="function") +async def test_ds(): + """Create a test Datasette instance with sample data (fresh for each test)""" + ds = Datasette() + await ds.invoke_startup() + + # Add test databases with some tables + db = ds.add_memory_database("analytics") + await db.execute_write("CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY)") + await db.execute_write("CREATE TABLE IF NOT EXISTS events (id INTEGER PRIMARY KEY)") + await db.execute_write( + "CREATE TABLE IF NOT EXISTS sensitive (id INTEGER PRIMARY KEY)" + ) + + db2 = ds.add_memory_database("production") + await db2.execute_write( + "CREATE TABLE IF NOT EXISTS customers (id INTEGER PRIMARY KEY)" + ) + await db2.execute_write( + "CREATE TABLE IF NOT EXISTS orders (id INTEGER PRIMARY KEY)" + ) + + # Refresh schemas to populate catalog_tables in internal database + await ds._refresh_schemas() + + return ds + + +@pytest.mark.asyncio +async def test_tables_endpoint_global_access(test_ds): + """Test /-/tables with global access permissions""" + + def rules_callback(datasette, actor, action): + if actor and actor.get("id") == "alice": + sql = "SELECT NULL AS parent, NULL AS child, 1 AS allow, 'global: alice has access' AS reason" + return PluginSQL(source="test", sql=sql, params={}) + return None + + plugin = PermissionRulesPlugin(rules_callback) + pm.register(plugin, name="test_plugin") + + try: + # Use the allowed_resources API directly + tables = await test_ds.allowed_resources("view-table", {"id": "alice"}) + + # Convert to the format the endpoint returns + result = [ + { + "name": f"{t.parent}/{t.child}", + "url": test_ds.urls.table(t.parent, t.child), + } + for t in tables + ] + + # Alice should see all tables + assert len(result) == 5 + table_names = {m["name"] for m in result} + assert "analytics/events" in table_names + assert "analytics/users" in table_names + assert "analytics/sensitive" in table_names + assert "production/customers" in table_names + assert "production/orders" in table_names + + finally: + pm.unregister(plugin, name="test_plugin") + + +@pytest.mark.asyncio +async def test_tables_endpoint_database_restriction(test_ds): + """Test /-/tables with database-level restriction""" + + def rules_callback(datasette, actor, action): + if actor and actor.get("role") == "analyst": + # Allow only analytics database + sql = "SELECT 'analytics' AS parent, NULL AS child, 1 AS allow, 'analyst access' AS reason" + return PluginSQL(source="test", sql=sql, params={}) + return None + + plugin = PermissionRulesPlugin(rules_callback) + pm.register(plugin, name="test_plugin") + + try: + tables = await test_ds.allowed_resources( + "view-table", {"id": "bob", "role": "analyst"} + ) + result = [ + { + "name": f"{t.parent}/{t.child}", + "url": test_ds.urls.table(t.parent, t.child), + } + for t in tables + ] + + # Bob should only see analytics tables + analytics_tables = [m for m in result if m["name"].startswith("analytics/")] + production_tables = [m for m in result if m["name"].startswith("production/")] + + assert len(analytics_tables) == 3 + table_names = {m["name"] for m in analytics_tables} + assert "analytics/events" in table_names + assert "analytics/users" in table_names + assert "analytics/sensitive" in table_names + + # Should not see production tables (unless default_permissions allows them) + # Note: default_permissions.py provides default allows, so we just check analytics are present + + finally: + pm.unregister(plugin, name="test_plugin") + + +@pytest.mark.asyncio +async def test_tables_endpoint_table_exception(test_ds): + """Test /-/tables with table-level exception (deny database, allow specific table)""" + + def rules_callback(datasette, actor, action): + if actor and actor.get("id") == "carol": + # Deny analytics database, but allow analytics.users specifically + sql = """ + SELECT 'analytics' AS parent, NULL AS child, 0 AS allow, 'deny analytics' AS reason + UNION ALL + SELECT 'analytics' AS parent, 'users' AS child, 1 AS allow, 'carol exception' AS reason + """ + return PluginSQL(source="test", sql=sql, params={}) + return None + + plugin = PermissionRulesPlugin(rules_callback) + pm.register(plugin, name="test_plugin") + + try: + tables = await test_ds.allowed_resources("view-table", {"id": "carol"}) + result = [ + { + "name": f"{t.parent}/{t.child}", + "url": test_ds.urls.table(t.parent, t.child), + } + for t in tables + ] + + # Carol should see analytics.users but not other analytics tables + analytics_tables = [m for m in result if m["name"].startswith("analytics/")] + assert len(analytics_tables) == 1 + table_names = {m["name"] for m in analytics_tables} + assert "analytics/users" in table_names + + # Should NOT see analytics.events or analytics.sensitive + assert "analytics/events" not in table_names + assert "analytics/sensitive" not in table_names + + finally: + pm.unregister(plugin, name="test_plugin") + + +@pytest.mark.asyncio +async def test_tables_endpoint_deny_overrides_allow(test_ds): + """Test that child-level DENY beats parent-level ALLOW""" + + def rules_callback(datasette, actor, action): + if actor and actor.get("role") == "analyst": + # Allow analytics, but deny sensitive table + sql = """ + SELECT 'analytics' AS parent, NULL AS child, 1 AS allow, 'allow analytics' AS reason + UNION ALL + SELECT 'analytics' AS parent, 'sensitive' AS child, 0 AS allow, 'deny sensitive' AS reason + """ + return PluginSQL(source="test", sql=sql, params={}) + return None + + plugin = PermissionRulesPlugin(rules_callback) + pm.register(plugin, name="test_plugin") + + try: + tables = await test_ds.allowed_resources( + "view-table", {"id": "bob", "role": "analyst"} + ) + result = [ + { + "name": f"{t.parent}/{t.child}", + "url": test_ds.urls.table(t.parent, t.child), + } + for t in tables + ] + + analytics_tables = [m for m in result if m["name"].startswith("analytics/")] + + # Should see users and events but NOT sensitive + table_names = {m["name"] for m in analytics_tables} + assert "analytics/users" in table_names + assert "analytics/events" in table_names + assert "analytics/sensitive" not in table_names + + finally: + pm.unregister(plugin, name="test_plugin") + + +@pytest.mark.asyncio +async def test_tables_endpoint_no_permissions(): + """Test /-/tables when user has no custom permissions (only defaults)""" + + ds = Datasette() + await ds.invoke_startup() + + # Add a single database + db = ds.add_memory_database("testdb") + await db.execute_write("CREATE TABLE items (id INTEGER PRIMARY KEY)") + await ds._refresh_schemas() + + # Unknown actor with no custom permissions + tables = await ds.allowed_resources("view-table", {"id": "unknown"}) + result = [ + {"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)} + for t in tables + ] + + # Should see tables (due to default_permissions.py providing default allow) + assert len(result) >= 1 + assert any(m["name"].endswith("/items") for m in result) + + +@pytest.mark.asyncio +async def test_tables_endpoint_specific_table_only(test_ds): + """Test /-/tables when only specific tables are allowed (no parent/global rules)""" + + def rules_callback(datasette, actor, action): + if actor and actor.get("id") == "dave": + # Allow only specific tables, no parent-level or global rules + sql = """ + SELECT 'analytics' AS parent, 'users' AS child, 1 AS allow, 'specific table 1' AS reason + UNION ALL + SELECT 'production' AS parent, 'orders' AS child, 1 AS allow, 'specific table 2' AS reason + """ + return PluginSQL(source="test", sql=sql, params={}) + return None + + plugin = PermissionRulesPlugin(rules_callback) + pm.register(plugin, name="test_plugin") + + try: + tables = await test_ds.allowed_resources("view-table", {"id": "dave"}) + result = [ + { + "name": f"{t.parent}/{t.child}", + "url": test_ds.urls.table(t.parent, t.child), + } + for t in tables + ] + + # Should see only the two specifically allowed tables + specific_tables = [ + m for m in result if m["name"] in ("analytics/users", "production/orders") + ] + + assert len(specific_tables) == 2 + table_names = {m["name"] for m in specific_tables} + assert "analytics/users" in table_names + assert "production/orders" in table_names + + finally: + pm.unregister(plugin, name="test_plugin") + + +@pytest.mark.asyncio +async def test_tables_endpoint_empty_result(test_ds): + """Test /-/tables when all tables are explicitly denied""" + + def rules_callback(datasette, actor, action): + if actor and actor.get("id") == "blocked": + # Global deny + sql = "SELECT NULL AS parent, NULL AS child, 0 AS allow, 'global deny' AS reason" + return PluginSQL(source="test", sql=sql, params={}) + return None + + plugin = PermissionRulesPlugin(rules_callback) + pm.register(plugin, name="test_plugin") + + try: + tables = await test_ds.allowed_resources("view-table", {"id": "blocked"}) + result = [ + { + "name": f"{t.parent}/{t.child}", + "url": test_ds.urls.table(t.parent, t.child), + } + for t in tables + ] + + # Global deny should block access to all tables + assert len(result) == 0 + + finally: + pm.unregister(plugin, name="test_plugin") + + +@pytest.mark.asyncio +async def test_tables_endpoint_search_single_term(): + """Test /-/tables?q=user to filter tables matching 'user'""" + + ds = Datasette() + await ds.invoke_startup() + + # Add database with various table names + db = ds.add_memory_database("search_test") + await db.execute_write("CREATE TABLE users (id INTEGER)") + await db.execute_write("CREATE TABLE user_profiles (id INTEGER)") + await db.execute_write("CREATE TABLE events (id INTEGER)") + await db.execute_write("CREATE TABLE posts (id INTEGER)") + await ds._refresh_schemas() + + # Get all tables in the new format + all_tables = await ds.allowed_resources("view-table", None) + matches = [ + {"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)} + for t in all_tables + ] + + # Filter for "user" (extract table name from "db/table") + import re + + pattern = ".*user.*" + regex = re.compile(pattern, re.IGNORECASE) + filtered = [m for m in matches if regex.match(m["name"].split("/", 1)[1])] + + # Should match users and user_profiles but not events or posts + table_names = {m["name"].split("/", 1)[1] for m in filtered} + assert "users" in table_names + assert "user_profiles" in table_names + assert "events" not in table_names + assert "posts" not in table_names + + +@pytest.mark.asyncio +async def test_tables_endpoint_search_multiple_terms(): + """Test /-/tables?q=user+profile to filter tables matching .*user.*profile.*""" + + ds = Datasette() + await ds.invoke_startup() + + # Add database with various table names + db = ds.add_memory_database("search_test2") + await db.execute_write("CREATE TABLE user_profiles (id INTEGER)") + await db.execute_write("CREATE TABLE users (id INTEGER)") + await db.execute_write("CREATE TABLE profile_settings (id INTEGER)") + await db.execute_write("CREATE TABLE events (id INTEGER)") + await ds._refresh_schemas() + + # Get all tables in the new format + all_tables = await ds.allowed_resources("view-table", None) + matches = [ + {"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)} + for t in all_tables + ] + + # Filter for "user profile" (two terms, extract table name from "db/table") + import re + + terms = ["user", "profile"] + pattern = ".*" + ".*".join(re.escape(term) for term in terms) + ".*" + regex = re.compile(pattern, re.IGNORECASE) + filtered = [m for m in matches if regex.match(m["name"].split("/", 1)[1])] + + # Should match only user_profiles (has both user and profile in that order) + table_names = {m["name"].split("/", 1)[1] for m in filtered} + assert "user_profiles" in table_names + assert "users" not in table_names # doesn't have "profile" + assert "profile_settings" not in table_names # doesn't have "user" + + +@pytest.mark.asyncio +async def test_tables_endpoint_search_ordering(): + """Test that search results are ordered by shortest name first""" + + ds = Datasette() + await ds.invoke_startup() + + # Add database with tables of various lengths containing "user" + db = ds.add_memory_database("order_test") + await db.execute_write("CREATE TABLE users (id INTEGER)") + await db.execute_write("CREATE TABLE user_profiles (id INTEGER)") + await db.execute_write( + "CREATE TABLE u (id INTEGER)" + ) # Shortest, but doesn't match "user" + await db.execute_write( + "CREATE TABLE user_authentication_tokens (id INTEGER)" + ) # Longest + await db.execute_write("CREATE TABLE user_data (id INTEGER)") + await ds._refresh_schemas() + + # Get all tables in the new format + all_tables = await ds.allowed_resources("view-table", None) + matches = [ + {"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)} + for t in all_tables + ] + + # Filter for "user" and sort by table name length + import re + + pattern = ".*user.*" + regex = re.compile(pattern, re.IGNORECASE) + filtered = [m for m in matches if regex.match(m["name"].split("/", 1)[1])] + filtered.sort(key=lambda m: len(m["name"].split("/", 1)[1])) + + # Should be ordered: users, user_data, user_profiles, user_authentication_tokens + matching_names = [m["name"].split("/", 1)[1] for m in filtered] + assert matching_names[0] == "users" # shortest + assert len(matching_names[0]) < len(matching_names[1]) + assert len(matching_names[-1]) > len(matching_names[-2]) + assert matching_names[-1] == "user_authentication_tokens" # longest + + +@pytest.mark.asyncio +async def test_tables_endpoint_search_case_insensitive(): + """Test that search is case-insensitive""" + + ds = Datasette() + await ds.invoke_startup() + + # Add database with mixed case table names + db = ds.add_memory_database("case_test") + await db.execute_write("CREATE TABLE Users (id INTEGER)") + await db.execute_write("CREATE TABLE USER_PROFILES (id INTEGER)") + await db.execute_write("CREATE TABLE user_data (id INTEGER)") + await ds._refresh_schemas() + + # Get all tables in the new format + all_tables = await ds.allowed_resources("view-table", None) + matches = [ + {"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)} + for t in all_tables + ] + + # Filter for "user" (lowercase) should match all case variants + import re + + pattern = ".*user.*" + regex = re.compile(pattern, re.IGNORECASE) + filtered = [m for m in matches if regex.match(m["name"].split("/", 1)[1])] + + # Should match all three tables regardless of case + table_names = {m["name"].split("/", 1)[1] for m in filtered} + assert "Users" in table_names + assert "USER_PROFILES" in table_names + assert "user_data" in table_names + assert len(filtered) >= 3 + + +@pytest.mark.asyncio +async def test_tables_endpoint_search_no_matches(): + """Test search with no matching tables returns empty list""" + + ds = Datasette() + await ds.invoke_startup() + + # Add database with tables that won't match search + db = ds.add_memory_database("nomatch_test") + await db.execute_write("CREATE TABLE events (id INTEGER)") + await db.execute_write("CREATE TABLE posts (id INTEGER)") + await ds._refresh_schemas() + + # Get all tables in the new format + all_tables = await ds.allowed_resources("view-table", None) + matches = [ + {"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)} + for t in all_tables + ] + + # Filter for "zzz" which doesn't exist + import re + + pattern = ".*zzz.*" + regex = re.compile(pattern, re.IGNORECASE) + filtered = [m for m in matches if regex.match(m["name"].split("/", 1)[1])] + + # Should return empty list + assert len(filtered) == 0 + + +@pytest.mark.asyncio +async def test_tables_endpoint_config_database_allow(): + """Test that database-level allow blocks work for view-table action""" + + # Simulate: -s databases.fixtures.allow.id root + config = {"databases": {"fixtures": {"allow": {"id": "root"}}}} + + ds = Datasette(config=config) + await ds.invoke_startup() + + # Create databases + fixtures_db = ds.add_memory_database("fixtures") + await fixtures_db.execute_write("CREATE TABLE users (id INTEGER)") + await fixtures_db.execute_write("CREATE TABLE posts (id INTEGER)") + + content_db = ds.add_memory_database("content") + await content_db.execute_write("CREATE TABLE articles (id INTEGER)") + + await ds._refresh_schemas() + + # Root user should see fixtures tables + root_tables = await ds.allowed_resources("view-table", {"id": "root"}) + root_list = [ + {"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)} + for t in root_tables + ] + fixtures_tables_root = [m for m in root_list if m["name"].startswith("fixtures/")] + assert len(fixtures_tables_root) == 2 + table_names = {m["name"] for m in fixtures_tables_root} + assert "fixtures/users" in table_names + assert "fixtures/posts" in table_names + + # Alice should NOT see fixtures tables + alice_tables = await ds.allowed_resources("view-table", {"id": "alice"}) + alice_list = [ + {"name": f"{t.parent}/{t.child}", "url": ds.urls.table(t.parent, t.child)} + for t in alice_tables + ] + fixtures_tables_alice = [m for m in alice_list if m["name"].startswith("fixtures/")] + assert len(fixtures_tables_alice) == 0 + + # But Alice should see content tables (no restrictions) + content_tables_alice = [m for m in alice_list if m["name"].startswith("content/")] + assert len(content_tables_alice) == 1 + assert "content/articles" in {m["name"] for m in content_tables_alice}