diff --git a/datasette/app.py b/datasette/app.py index 09936b3a..177debe2 100644 --- a/datasette/app.py +++ b/datasette/app.py @@ -2363,6 +2363,12 @@ class NotFoundExplicit(NotFound): class DatasetteClient: + """Internal HTTP client for making requests to a Datasette instance. + + Used for testing and for internal operations that need to make HTTP requests + to the Datasette app without going through an actual HTTP server. + """ + def __init__(self, ds): self.ds = ds self.app = ds.app() @@ -2378,40 +2384,87 @@ class DatasetteClient: path = f"http://localhost{path}" return path - async def _request(self, method, path, **kwargs): - async with httpx.AsyncClient( - transport=httpx.ASGITransport(app=self.app), - cookies=kwargs.pop("cookies", None), - ) as client: - return await getattr(client, method)(self._fix(path), **kwargs) + async def _request(self, method, path, skip_permission_checks=False, **kwargs): + from datasette.permissions import SkipPermissions - async def get(self, path, **kwargs): - return await self._request("get", path, **kwargs) + if skip_permission_checks: + with SkipPermissions(): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=self.app), + cookies=kwargs.pop("cookies", None), + ) as client: + return await getattr(client, method)(self._fix(path), **kwargs) + else: + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=self.app), + cookies=kwargs.pop("cookies", None), + ) as client: + return await getattr(client, method)(self._fix(path), **kwargs) - async def options(self, path, **kwargs): - return await self._request("options", path, **kwargs) + async def get(self, path, skip_permission_checks=False, **kwargs): + return await self._request( + "get", path, skip_permission_checks=skip_permission_checks, **kwargs + ) - async def head(self, path, **kwargs): - return await self._request("head", path, **kwargs) + async def options(self, path, skip_permission_checks=False, **kwargs): + return await self._request( + "options", path, skip_permission_checks=skip_permission_checks, **kwargs + ) - async def post(self, path, **kwargs): - return await self._request("post", path, **kwargs) + async def head(self, path, skip_permission_checks=False, **kwargs): + return await self._request( + "head", path, skip_permission_checks=skip_permission_checks, **kwargs + ) - async def put(self, path, **kwargs): - return await self._request("put", path, **kwargs) + async def post(self, path, skip_permission_checks=False, **kwargs): + return await self._request( + "post", path, skip_permission_checks=skip_permission_checks, **kwargs + ) - async def patch(self, path, **kwargs): - return await self._request("patch", path, **kwargs) + async def put(self, path, skip_permission_checks=False, **kwargs): + return await self._request( + "put", path, skip_permission_checks=skip_permission_checks, **kwargs + ) - async def delete(self, path, **kwargs): - return await self._request("delete", path, **kwargs) + async def patch(self, path, skip_permission_checks=False, **kwargs): + return await self._request( + "patch", path, skip_permission_checks=skip_permission_checks, **kwargs + ) + + async def delete(self, path, skip_permission_checks=False, **kwargs): + return await self._request( + "delete", path, skip_permission_checks=skip_permission_checks, **kwargs + ) + + async def request(self, method, path, skip_permission_checks=False, **kwargs): + """Make an HTTP request with the specified method. + + Args: + method: HTTP method (e.g., "GET", "POST", "PUT") + path: The path to request + skip_permission_checks: If True, bypass all permission checks for this request + **kwargs: Additional arguments to pass to httpx + + Returns: + httpx.Response: The response from the request + """ + from datasette.permissions import SkipPermissions - async def request(self, method, path, **kwargs): avoid_path_rewrites = kwargs.pop("avoid_path_rewrites", None) - async with httpx.AsyncClient( - transport=httpx.ASGITransport(app=self.app), - cookies=kwargs.pop("cookies", None), - ) as client: - return await client.request( - method, self._fix(path, avoid_path_rewrites), **kwargs - ) + if skip_permission_checks: + with SkipPermissions(): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=self.app), + cookies=kwargs.pop("cookies", None), + ) as client: + return await client.request( + method, self._fix(path, avoid_path_rewrites), **kwargs + ) + else: + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=self.app), + cookies=kwargs.pop("cookies", None), + ) as client: + return await client.request( + method, self._fix(path, avoid_path_rewrites), **kwargs + ) diff --git a/datasette/permissions.py b/datasette/permissions.py index c91385a0..c48293ac 100644 --- a/datasette/permissions.py +++ b/datasette/permissions.py @@ -1,6 +1,33 @@ from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Any, NamedTuple +import contextvars + + +# Context variable to track when permission checks should be skipped +_skip_permission_checks = contextvars.ContextVar( + "skip_permission_checks", default=False +) + + +class SkipPermissions: + """Context manager to temporarily skip permission checks. + + This is not a stable API and may change in future releases. + + Usage: + with SkipPermissions(): + # Permission checks are skipped within this block + response = await datasette.client.get("/protected") + """ + + def __enter__(self): + self.token = _skip_permission_checks.set(True) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + _skip_permission_checks.reset(self.token) + return False class Resource(ABC): diff --git a/datasette/utils/actions_sql.py b/datasette/utils/actions_sql.py index 7121e2d0..9c2add0e 100644 --- a/datasette/utils/actions_sql.py +++ b/datasette/utils/actions_sql.py @@ -155,6 +155,16 @@ async def _build_single_action_sql( action=action, ) + # If permission_sqls is the sentinel, skip all permission checks + # Return SQL that allows all resources + from datasette.utils.permissions import SKIP_PERMISSION_CHECKS + + if permission_sqls is SKIP_PERMISSION_CHECKS: + cols = "parent, child, 'skip_permission_checks' AS reason" + if include_is_private: + cols += ", 0 AS is_private" + return f"SELECT {cols} FROM ({base_resources_sql})", {} + all_params = {} rule_sqls = [] restriction_sqls = [] @@ -436,6 +446,17 @@ async def build_permission_rules_sql( action=action, ) + # If permission_sqls is the sentinel, skip all permission checks + # Return SQL that allows everything + from datasette.utils.permissions import SKIP_PERMISSION_CHECKS + + if permission_sqls is SKIP_PERMISSION_CHECKS: + return ( + "SELECT NULL AS parent, NULL AS child, 1 AS allow, 'skip_permission_checks' AS reason, 'skip' AS source_plugin", + {}, + [], + ) + if not permission_sqls: return ( "SELECT NULL AS parent, NULL AS child, 0 AS allow, NULL AS reason, NULL AS source_plugin WHERE 0", diff --git a/datasette/utils/permissions.py b/datasette/utils/permissions.py index 58be53a3..6c30a12a 100644 --- a/datasette/utils/permissions.py +++ b/datasette/utils/permissions.py @@ -10,13 +10,26 @@ from datasette.plugins import pm from datasette.utils import await_me_maybe +# Sentinel object to indicate permission checks should be skipped +SKIP_PERMISSION_CHECKS = object() + + async def gather_permission_sql_from_hooks( *, datasette, actor: dict | None, action: str -) -> List[PermissionSQL]: +) -> List[PermissionSQL] | object: """Collect PermissionSQL objects from the permission_resources_sql hook. Ensures that each returned PermissionSQL has a populated ``source``. + + Returns SKIP_PERMISSION_CHECKS sentinel if skip_permission_checks context variable + is set, signaling that all permission checks should be bypassed. """ + from datasette.permissions import _skip_permission_checks + + # Check if we should skip permission checks BEFORE calling hooks + # This avoids creating unawaited coroutines + if _skip_permission_checks.get(): + return SKIP_PERMISSION_CHECKS hook_caller = pm.hook.permission_resources_sql hookimpls = hook_caller.get_hookimpls() diff --git a/docs/internals.rst b/docs/internals.rst index 406bc9b3..468b3f95 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -1045,6 +1045,36 @@ These methods can be used with :ref:`internals_datasette_urls` - for example: For documentation on available ``**kwargs`` options and the shape of the HTTPX Response object refer to the `HTTPX Async documentation `__. +Bypassing permission checks +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +All ``datasette.client`` methods accept an optional ``skip_permission_checks=True`` parameter. When set, all permission checks will be bypassed for that request, allowing access to any resource regardless of the configured permissions. + +This is useful for plugins and internal operations that need to access all resources without being subject to permission restrictions. + +Example usage: + +.. code-block:: python + + # Regular request - respects permissions + response = await datasette.client.get( + "/private-db/secret-table.json" + ) + # May return 403 Forbidden if access is denied + + # With skip_permission_checks - bypasses all permission checks + response = await datasette.client.get( + "/private-db/secret-table.json", + skip_permission_checks=True, + ) + # Will return 200 OK and the data, regardless of permissions + +This parameter works with all HTTP methods (``get``, ``post``, ``put``, ``patch``, ``delete``, ``options``, ``head``) and the generic ``request`` method. + +.. warning:: + + Use ``skip_permission_checks=True`` with caution. It completely bypasses Datasette's permission system and should only be used in trusted plugin code or internal operations where you need guaranteed access to resources. + .. _internals_datasette_urls: datasette.urls diff --git a/tests/test_internals_datasette_client.py b/tests/test_internals_datasette_client.py index afc9b335..55f7392f 100644 --- a/tests/test_internals_datasette_client.py +++ b/tests/test_internals_datasette_client.py @@ -1,6 +1,7 @@ import httpx import pytest import pytest_asyncio +from datasette.app import Datasette @pytest_asyncio.fixture @@ -9,6 +10,23 @@ async def datasette(ds_client): return ds_client.ds +@pytest_asyncio.fixture +async def datasette_with_permissions(): + """A datasette instance with permission restrictions for testing""" + ds = Datasette(config={"databases": {"test_db": {"allow": {"id": "admin"}}}}) + await ds.invoke_startup() + db = ds.add_memory_database("test_db") + await db.execute_write( + "create table if not exists test_table (id integer primary key, name text)" + ) + await db.execute_write( + "insert or ignore into test_table (id, name) values (1, 'Alice')" + ) + # Trigger catalog refresh + await ds.client.get("/") + return ds + + @pytest.mark.asyncio @pytest.mark.parametrize( "method,path,expected_status", @@ -65,3 +83,147 @@ async def test_client_path(datasette, prefix, expected_path): assert path == expected_path finally: datasette._settings["base_url"] = original_base_url + + +@pytest.mark.asyncio +async def test_skip_permission_checks_allows_forbidden_access( + datasette_with_permissions, +): + """Test that skip_permission_checks=True bypasses permission checks""" + ds = datasette_with_permissions + + # Without skip_permission_checks, anonymous user should get 403 for protected database + response = await ds.client.get("/test_db.json") + assert response.status_code == 403 + + # With skip_permission_checks=True, should get 200 + response = await ds.client.get("/test_db.json", skip_permission_checks=True) + assert response.status_code == 200 + data = response.json() + assert data["database"] == "test_db" + + +@pytest.mark.asyncio +async def test_skip_permission_checks_on_table(datasette_with_permissions): + """Test skip_permission_checks works for table access""" + ds = datasette_with_permissions + + # Without skip_permission_checks, should get 403 + response = await ds.client.get("/test_db/test_table.json") + assert response.status_code == 403 + + # With skip_permission_checks=True, should get table data + response = await ds.client.get( + "/test_db/test_table.json", skip_permission_checks=True + ) + assert response.status_code == 200 + data = response.json() + assert data["rows"] == [{"id": 1, "name": "Alice"}] + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "method", ["get", "post", "put", "patch", "delete", "options", "head"] +) +async def test_skip_permission_checks_all_methods(datasette_with_permissions, method): + """Test that skip_permission_checks works with all HTTP methods""" + ds = datasette_with_permissions + + # All methods should work with skip_permission_checks=True + client_method = getattr(ds.client, method) + response = await client_method("/test_db.json", skip_permission_checks=True) + # We don't check status code since some methods might not be allowed, + # but we verify the request doesn't fail due to permissions + assert isinstance(response, httpx.Response) + + +@pytest.mark.asyncio +async def test_skip_permission_checks_request_method(datasette_with_permissions): + """Test that skip_permission_checks works with client.request()""" + ds = datasette_with_permissions + + # Without skip_permission_checks + response = await ds.client.request("GET", "/test_db.json") + assert response.status_code == 403 + + # With skip_permission_checks=True + response = await ds.client.request( + "GET", "/test_db.json", skip_permission_checks=True + ) + assert response.status_code == 200 + + +@pytest.mark.asyncio +async def test_skip_permission_checks_isolated_to_request(datasette_with_permissions): + """Test that skip_permission_checks doesn't affect other concurrent requests""" + ds = datasette_with_permissions + + # First request with skip_permission_checks=True should succeed + response1 = await ds.client.get("/test_db.json", skip_permission_checks=True) + assert response1.status_code == 200 + + # Subsequent request without it should still get 403 + response2 = await ds.client.get("/test_db.json") + assert response2.status_code == 403 + + # And another with skip should succeed again + response3 = await ds.client.get("/test_db.json", skip_permission_checks=True) + assert response3.status_code == 200 + + +@pytest.mark.asyncio +async def test_skip_permission_checks_with_admin_actor(datasette_with_permissions): + """Test that skip_permission_checks works even when actor is provided""" + ds = datasette_with_permissions + + # Admin actor should normally have access + admin_cookies = {"ds_actor": ds.client.actor_cookie({"id": "admin"})} + response = await ds.client.get("/test_db.json", cookies=admin_cookies) + assert response.status_code == 200 + + # Non-admin actor should get 403 + user_cookies = {"ds_actor": ds.client.actor_cookie({"id": "user"})} + response = await ds.client.get("/test_db.json", cookies=user_cookies) + assert response.status_code == 403 + + # Non-admin actor with skip_permission_checks=True should get 200 + response = await ds.client.get( + "/test_db.json", cookies=user_cookies, skip_permission_checks=True + ) + assert response.status_code == 200 + + +@pytest.mark.asyncio +async def test_skip_permission_checks_shows_denied_tables(): + """Test that skip_permission_checks=True shows tables from denied databases in /-/tables.json""" + ds = Datasette( + config={ + "databases": { + "fixtures": {"allow": False} # Deny all access to this database + } + } + ) + await ds.invoke_startup() + db = ds.add_memory_database("fixtures") + await db.execute_write( + "CREATE TABLE test_table (id INTEGER PRIMARY KEY, name TEXT)" + ) + await db.execute_write("INSERT INTO test_table (id, name) VALUES (1, 'Alice')") + await ds._refresh_schemas() + + # Without skip_permission_checks, tables from denied database should not appear in /-/tables.json + response = await ds.client.get("/-/tables.json") + assert response.status_code == 200 + data = response.json() + table_names = [match["name"] for match in data["matches"]] + # Should not see any fixtures tables since access is denied + fixtures_tables = [name for name in table_names if name.startswith("fixtures:")] + assert len(fixtures_tables) == 0 + + # With skip_permission_checks=True, tables from denied database SHOULD appear + response = await ds.client.get("/-/tables.json", skip_permission_checks=True) + assert response.status_code == 200 + data = response.json() + table_names = [match["name"] for match in data["matches"]] + # Should see fixtures tables when permission checks are skipped + assert "fixtures: test_table" in table_names