mirror of
https://github.com/simonw/datasette.git
synced 2025-12-10 16:51:24 +01:00
parent
ec99bb46f8
commit
d814e81b32
6 changed files with 335 additions and 29 deletions
109
datasette/app.py
109
datasette/app.py
|
|
@ -2363,6 +2363,12 @@ class NotFoundExplicit(NotFound):
|
||||||
|
|
||||||
|
|
||||||
class DatasetteClient:
|
class DatasetteClient:
|
||||||
|
"""Internal HTTP client for making requests to a Datasette instance.
|
||||||
|
|
||||||
|
Used for testing and for internal operations that need to make HTTP requests
|
||||||
|
to the Datasette app without going through an actual HTTP server.
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, ds):
|
def __init__(self, ds):
|
||||||
self.ds = ds
|
self.ds = ds
|
||||||
self.app = ds.app()
|
self.app = ds.app()
|
||||||
|
|
@ -2378,40 +2384,87 @@ class DatasetteClient:
|
||||||
path = f"http://localhost{path}"
|
path = f"http://localhost{path}"
|
||||||
return path
|
return path
|
||||||
|
|
||||||
async def _request(self, method, path, **kwargs):
|
async def _request(self, method, path, skip_permission_checks=False, **kwargs):
|
||||||
async with httpx.AsyncClient(
|
from datasette.permissions import SkipPermissions
|
||||||
transport=httpx.ASGITransport(app=self.app),
|
|
||||||
cookies=kwargs.pop("cookies", None),
|
|
||||||
) as client:
|
|
||||||
return await getattr(client, method)(self._fix(path), **kwargs)
|
|
||||||
|
|
||||||
async def get(self, path, **kwargs):
|
if skip_permission_checks:
|
||||||
return await self._request("get", path, **kwargs)
|
with SkipPermissions():
|
||||||
|
async with httpx.AsyncClient(
|
||||||
|
transport=httpx.ASGITransport(app=self.app),
|
||||||
|
cookies=kwargs.pop("cookies", None),
|
||||||
|
) as client:
|
||||||
|
return await getattr(client, method)(self._fix(path), **kwargs)
|
||||||
|
else:
|
||||||
|
async with httpx.AsyncClient(
|
||||||
|
transport=httpx.ASGITransport(app=self.app),
|
||||||
|
cookies=kwargs.pop("cookies", None),
|
||||||
|
) as client:
|
||||||
|
return await getattr(client, method)(self._fix(path), **kwargs)
|
||||||
|
|
||||||
async def options(self, path, **kwargs):
|
async def get(self, path, skip_permission_checks=False, **kwargs):
|
||||||
return await self._request("options", path, **kwargs)
|
return await self._request(
|
||||||
|
"get", path, skip_permission_checks=skip_permission_checks, **kwargs
|
||||||
|
)
|
||||||
|
|
||||||
async def head(self, path, **kwargs):
|
async def options(self, path, skip_permission_checks=False, **kwargs):
|
||||||
return await self._request("head", path, **kwargs)
|
return await self._request(
|
||||||
|
"options", path, skip_permission_checks=skip_permission_checks, **kwargs
|
||||||
|
)
|
||||||
|
|
||||||
async def post(self, path, **kwargs):
|
async def head(self, path, skip_permission_checks=False, **kwargs):
|
||||||
return await self._request("post", path, **kwargs)
|
return await self._request(
|
||||||
|
"head", path, skip_permission_checks=skip_permission_checks, **kwargs
|
||||||
|
)
|
||||||
|
|
||||||
async def put(self, path, **kwargs):
|
async def post(self, path, skip_permission_checks=False, **kwargs):
|
||||||
return await self._request("put", path, **kwargs)
|
return await self._request(
|
||||||
|
"post", path, skip_permission_checks=skip_permission_checks, **kwargs
|
||||||
|
)
|
||||||
|
|
||||||
async def patch(self, path, **kwargs):
|
async def put(self, path, skip_permission_checks=False, **kwargs):
|
||||||
return await self._request("patch", path, **kwargs)
|
return await self._request(
|
||||||
|
"put", path, skip_permission_checks=skip_permission_checks, **kwargs
|
||||||
|
)
|
||||||
|
|
||||||
async def delete(self, path, **kwargs):
|
async def patch(self, path, skip_permission_checks=False, **kwargs):
|
||||||
return await self._request("delete", path, **kwargs)
|
return await self._request(
|
||||||
|
"patch", path, skip_permission_checks=skip_permission_checks, **kwargs
|
||||||
|
)
|
||||||
|
|
||||||
|
async def delete(self, path, skip_permission_checks=False, **kwargs):
|
||||||
|
return await self._request(
|
||||||
|
"delete", path, skip_permission_checks=skip_permission_checks, **kwargs
|
||||||
|
)
|
||||||
|
|
||||||
|
async def request(self, method, path, skip_permission_checks=False, **kwargs):
|
||||||
|
"""Make an HTTP request with the specified method.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
method: HTTP method (e.g., "GET", "POST", "PUT")
|
||||||
|
path: The path to request
|
||||||
|
skip_permission_checks: If True, bypass all permission checks for this request
|
||||||
|
**kwargs: Additional arguments to pass to httpx
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
httpx.Response: The response from the request
|
||||||
|
"""
|
||||||
|
from datasette.permissions import SkipPermissions
|
||||||
|
|
||||||
async def request(self, method, path, **kwargs):
|
|
||||||
avoid_path_rewrites = kwargs.pop("avoid_path_rewrites", None)
|
avoid_path_rewrites = kwargs.pop("avoid_path_rewrites", None)
|
||||||
async with httpx.AsyncClient(
|
if skip_permission_checks:
|
||||||
transport=httpx.ASGITransport(app=self.app),
|
with SkipPermissions():
|
||||||
cookies=kwargs.pop("cookies", None),
|
async with httpx.AsyncClient(
|
||||||
) as client:
|
transport=httpx.ASGITransport(app=self.app),
|
||||||
return await client.request(
|
cookies=kwargs.pop("cookies", None),
|
||||||
method, self._fix(path, avoid_path_rewrites), **kwargs
|
) as client:
|
||||||
)
|
return await client.request(
|
||||||
|
method, self._fix(path, avoid_path_rewrites), **kwargs
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
async with httpx.AsyncClient(
|
||||||
|
transport=httpx.ASGITransport(app=self.app),
|
||||||
|
cookies=kwargs.pop("cookies", None),
|
||||||
|
) as client:
|
||||||
|
return await client.request(
|
||||||
|
method, self._fix(path, avoid_path_rewrites), **kwargs
|
||||||
|
)
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,33 @@
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Any, NamedTuple
|
from typing import Any, NamedTuple
|
||||||
|
import contextvars
|
||||||
|
|
||||||
|
|
||||||
|
# Context variable to track when permission checks should be skipped
|
||||||
|
_skip_permission_checks = contextvars.ContextVar(
|
||||||
|
"skip_permission_checks", default=False
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class SkipPermissions:
|
||||||
|
"""Context manager to temporarily skip permission checks.
|
||||||
|
|
||||||
|
This is not a stable API and may change in future releases.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
with SkipPermissions():
|
||||||
|
# Permission checks are skipped within this block
|
||||||
|
response = await datasette.client.get("/protected")
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.token = _skip_permission_checks.set(True)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
_skip_permission_checks.reset(self.token)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
class Resource(ABC):
|
class Resource(ABC):
|
||||||
|
|
|
||||||
|
|
@ -155,6 +155,16 @@ async def _build_single_action_sql(
|
||||||
action=action,
|
action=action,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# If permission_sqls is the sentinel, skip all permission checks
|
||||||
|
# Return SQL that allows all resources
|
||||||
|
from datasette.utils.permissions import SKIP_PERMISSION_CHECKS
|
||||||
|
|
||||||
|
if permission_sqls is SKIP_PERMISSION_CHECKS:
|
||||||
|
cols = "parent, child, 'skip_permission_checks' AS reason"
|
||||||
|
if include_is_private:
|
||||||
|
cols += ", 0 AS is_private"
|
||||||
|
return f"SELECT {cols} FROM ({base_resources_sql})", {}
|
||||||
|
|
||||||
all_params = {}
|
all_params = {}
|
||||||
rule_sqls = []
|
rule_sqls = []
|
||||||
restriction_sqls = []
|
restriction_sqls = []
|
||||||
|
|
@ -436,6 +446,17 @@ async def build_permission_rules_sql(
|
||||||
action=action,
|
action=action,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# If permission_sqls is the sentinel, skip all permission checks
|
||||||
|
# Return SQL that allows everything
|
||||||
|
from datasette.utils.permissions import SKIP_PERMISSION_CHECKS
|
||||||
|
|
||||||
|
if permission_sqls is SKIP_PERMISSION_CHECKS:
|
||||||
|
return (
|
||||||
|
"SELECT NULL AS parent, NULL AS child, 1 AS allow, 'skip_permission_checks' AS reason, 'skip' AS source_plugin",
|
||||||
|
{},
|
||||||
|
[],
|
||||||
|
)
|
||||||
|
|
||||||
if not permission_sqls:
|
if not permission_sqls:
|
||||||
return (
|
return (
|
||||||
"SELECT NULL AS parent, NULL AS child, 0 AS allow, NULL AS reason, NULL AS source_plugin WHERE 0",
|
"SELECT NULL AS parent, NULL AS child, 0 AS allow, NULL AS reason, NULL AS source_plugin WHERE 0",
|
||||||
|
|
|
||||||
|
|
@ -10,13 +10,26 @@ from datasette.plugins import pm
|
||||||
from datasette.utils import await_me_maybe
|
from datasette.utils import await_me_maybe
|
||||||
|
|
||||||
|
|
||||||
|
# Sentinel object to indicate permission checks should be skipped
|
||||||
|
SKIP_PERMISSION_CHECKS = object()
|
||||||
|
|
||||||
|
|
||||||
async def gather_permission_sql_from_hooks(
|
async def gather_permission_sql_from_hooks(
|
||||||
*, datasette, actor: dict | None, action: str
|
*, datasette, actor: dict | None, action: str
|
||||||
) -> List[PermissionSQL]:
|
) -> List[PermissionSQL] | object:
|
||||||
"""Collect PermissionSQL objects from the permission_resources_sql hook.
|
"""Collect PermissionSQL objects from the permission_resources_sql hook.
|
||||||
|
|
||||||
Ensures that each returned PermissionSQL has a populated ``source``.
|
Ensures that each returned PermissionSQL has a populated ``source``.
|
||||||
|
|
||||||
|
Returns SKIP_PERMISSION_CHECKS sentinel if skip_permission_checks context variable
|
||||||
|
is set, signaling that all permission checks should be bypassed.
|
||||||
"""
|
"""
|
||||||
|
from datasette.permissions import _skip_permission_checks
|
||||||
|
|
||||||
|
# Check if we should skip permission checks BEFORE calling hooks
|
||||||
|
# This avoids creating unawaited coroutines
|
||||||
|
if _skip_permission_checks.get():
|
||||||
|
return SKIP_PERMISSION_CHECKS
|
||||||
|
|
||||||
hook_caller = pm.hook.permission_resources_sql
|
hook_caller = pm.hook.permission_resources_sql
|
||||||
hookimpls = hook_caller.get_hookimpls()
|
hookimpls = hook_caller.get_hookimpls()
|
||||||
|
|
|
||||||
|
|
@ -1045,6 +1045,36 @@ These methods can be used with :ref:`internals_datasette_urls` - for example:
|
||||||
|
|
||||||
For documentation on available ``**kwargs`` options and the shape of the HTTPX Response object refer to the `HTTPX Async documentation <https://www.python-httpx.org/async/>`__.
|
For documentation on available ``**kwargs`` options and the shape of the HTTPX Response object refer to the `HTTPX Async documentation <https://www.python-httpx.org/async/>`__.
|
||||||
|
|
||||||
|
Bypassing permission checks
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
All ``datasette.client`` methods accept an optional ``skip_permission_checks=True`` parameter. When set, all permission checks will be bypassed for that request, allowing access to any resource regardless of the configured permissions.
|
||||||
|
|
||||||
|
This is useful for plugins and internal operations that need to access all resources without being subject to permission restrictions.
|
||||||
|
|
||||||
|
Example usage:
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
# Regular request - respects permissions
|
||||||
|
response = await datasette.client.get(
|
||||||
|
"/private-db/secret-table.json"
|
||||||
|
)
|
||||||
|
# May return 403 Forbidden if access is denied
|
||||||
|
|
||||||
|
# With skip_permission_checks - bypasses all permission checks
|
||||||
|
response = await datasette.client.get(
|
||||||
|
"/private-db/secret-table.json",
|
||||||
|
skip_permission_checks=True,
|
||||||
|
)
|
||||||
|
# Will return 200 OK and the data, regardless of permissions
|
||||||
|
|
||||||
|
This parameter works with all HTTP methods (``get``, ``post``, ``put``, ``patch``, ``delete``, ``options``, ``head``) and the generic ``request`` method.
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
|
||||||
|
Use ``skip_permission_checks=True`` with caution. It completely bypasses Datasette's permission system and should only be used in trusted plugin code or internal operations where you need guaranteed access to resources.
|
||||||
|
|
||||||
.. _internals_datasette_urls:
|
.. _internals_datasette_urls:
|
||||||
|
|
||||||
datasette.urls
|
datasette.urls
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
import httpx
|
import httpx
|
||||||
import pytest
|
import pytest
|
||||||
import pytest_asyncio
|
import pytest_asyncio
|
||||||
|
from datasette.app import Datasette
|
||||||
|
|
||||||
|
|
||||||
@pytest_asyncio.fixture
|
@pytest_asyncio.fixture
|
||||||
|
|
@ -9,6 +10,23 @@ async def datasette(ds_client):
|
||||||
return ds_client.ds
|
return ds_client.ds
|
||||||
|
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def datasette_with_permissions():
|
||||||
|
"""A datasette instance with permission restrictions for testing"""
|
||||||
|
ds = Datasette(config={"databases": {"test_db": {"allow": {"id": "admin"}}}})
|
||||||
|
await ds.invoke_startup()
|
||||||
|
db = ds.add_memory_database("test_db")
|
||||||
|
await db.execute_write(
|
||||||
|
"create table if not exists test_table (id integer primary key, name text)"
|
||||||
|
)
|
||||||
|
await db.execute_write(
|
||||||
|
"insert or ignore into test_table (id, name) values (1, 'Alice')"
|
||||||
|
)
|
||||||
|
# Trigger catalog refresh
|
||||||
|
await ds.client.get("/")
|
||||||
|
return ds
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"method,path,expected_status",
|
"method,path,expected_status",
|
||||||
|
|
@ -65,3 +83,147 @@ async def test_client_path(datasette, prefix, expected_path):
|
||||||
assert path == expected_path
|
assert path == expected_path
|
||||||
finally:
|
finally:
|
||||||
datasette._settings["base_url"] = original_base_url
|
datasette._settings["base_url"] = original_base_url
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_skip_permission_checks_allows_forbidden_access(
|
||||||
|
datasette_with_permissions,
|
||||||
|
):
|
||||||
|
"""Test that skip_permission_checks=True bypasses permission checks"""
|
||||||
|
ds = datasette_with_permissions
|
||||||
|
|
||||||
|
# Without skip_permission_checks, anonymous user should get 403 for protected database
|
||||||
|
response = await ds.client.get("/test_db.json")
|
||||||
|
assert response.status_code == 403
|
||||||
|
|
||||||
|
# With skip_permission_checks=True, should get 200
|
||||||
|
response = await ds.client.get("/test_db.json", skip_permission_checks=True)
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.json()
|
||||||
|
assert data["database"] == "test_db"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_skip_permission_checks_on_table(datasette_with_permissions):
|
||||||
|
"""Test skip_permission_checks works for table access"""
|
||||||
|
ds = datasette_with_permissions
|
||||||
|
|
||||||
|
# Without skip_permission_checks, should get 403
|
||||||
|
response = await ds.client.get("/test_db/test_table.json")
|
||||||
|
assert response.status_code == 403
|
||||||
|
|
||||||
|
# With skip_permission_checks=True, should get table data
|
||||||
|
response = await ds.client.get(
|
||||||
|
"/test_db/test_table.json", skip_permission_checks=True
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.json()
|
||||||
|
assert data["rows"] == [{"id": 1, "name": "Alice"}]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"method", ["get", "post", "put", "patch", "delete", "options", "head"]
|
||||||
|
)
|
||||||
|
async def test_skip_permission_checks_all_methods(datasette_with_permissions, method):
|
||||||
|
"""Test that skip_permission_checks works with all HTTP methods"""
|
||||||
|
ds = datasette_with_permissions
|
||||||
|
|
||||||
|
# All methods should work with skip_permission_checks=True
|
||||||
|
client_method = getattr(ds.client, method)
|
||||||
|
response = await client_method("/test_db.json", skip_permission_checks=True)
|
||||||
|
# We don't check status code since some methods might not be allowed,
|
||||||
|
# but we verify the request doesn't fail due to permissions
|
||||||
|
assert isinstance(response, httpx.Response)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_skip_permission_checks_request_method(datasette_with_permissions):
|
||||||
|
"""Test that skip_permission_checks works with client.request()"""
|
||||||
|
ds = datasette_with_permissions
|
||||||
|
|
||||||
|
# Without skip_permission_checks
|
||||||
|
response = await ds.client.request("GET", "/test_db.json")
|
||||||
|
assert response.status_code == 403
|
||||||
|
|
||||||
|
# With skip_permission_checks=True
|
||||||
|
response = await ds.client.request(
|
||||||
|
"GET", "/test_db.json", skip_permission_checks=True
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_skip_permission_checks_isolated_to_request(datasette_with_permissions):
|
||||||
|
"""Test that skip_permission_checks doesn't affect other concurrent requests"""
|
||||||
|
ds = datasette_with_permissions
|
||||||
|
|
||||||
|
# First request with skip_permission_checks=True should succeed
|
||||||
|
response1 = await ds.client.get("/test_db.json", skip_permission_checks=True)
|
||||||
|
assert response1.status_code == 200
|
||||||
|
|
||||||
|
# Subsequent request without it should still get 403
|
||||||
|
response2 = await ds.client.get("/test_db.json")
|
||||||
|
assert response2.status_code == 403
|
||||||
|
|
||||||
|
# And another with skip should succeed again
|
||||||
|
response3 = await ds.client.get("/test_db.json", skip_permission_checks=True)
|
||||||
|
assert response3.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_skip_permission_checks_with_admin_actor(datasette_with_permissions):
|
||||||
|
"""Test that skip_permission_checks works even when actor is provided"""
|
||||||
|
ds = datasette_with_permissions
|
||||||
|
|
||||||
|
# Admin actor should normally have access
|
||||||
|
admin_cookies = {"ds_actor": ds.client.actor_cookie({"id": "admin"})}
|
||||||
|
response = await ds.client.get("/test_db.json", cookies=admin_cookies)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
# Non-admin actor should get 403
|
||||||
|
user_cookies = {"ds_actor": ds.client.actor_cookie({"id": "user"})}
|
||||||
|
response = await ds.client.get("/test_db.json", cookies=user_cookies)
|
||||||
|
assert response.status_code == 403
|
||||||
|
|
||||||
|
# Non-admin actor with skip_permission_checks=True should get 200
|
||||||
|
response = await ds.client.get(
|
||||||
|
"/test_db.json", cookies=user_cookies, skip_permission_checks=True
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_skip_permission_checks_shows_denied_tables():
|
||||||
|
"""Test that skip_permission_checks=True shows tables from denied databases in /-/tables.json"""
|
||||||
|
ds = Datasette(
|
||||||
|
config={
|
||||||
|
"databases": {
|
||||||
|
"fixtures": {"allow": False} # Deny all access to this database
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
await ds.invoke_startup()
|
||||||
|
db = ds.add_memory_database("fixtures")
|
||||||
|
await db.execute_write(
|
||||||
|
"CREATE TABLE test_table (id INTEGER PRIMARY KEY, name TEXT)"
|
||||||
|
)
|
||||||
|
await db.execute_write("INSERT INTO test_table (id, name) VALUES (1, 'Alice')")
|
||||||
|
await ds._refresh_schemas()
|
||||||
|
|
||||||
|
# Without skip_permission_checks, tables from denied database should not appear in /-/tables.json
|
||||||
|
response = await ds.client.get("/-/tables.json")
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.json()
|
||||||
|
table_names = [match["name"] for match in data["matches"]]
|
||||||
|
# Should not see any fixtures tables since access is denied
|
||||||
|
fixtures_tables = [name for name in table_names if name.startswith("fixtures:")]
|
||||||
|
assert len(fixtures_tables) == 0
|
||||||
|
|
||||||
|
# With skip_permission_checks=True, tables from denied database SHOULD appear
|
||||||
|
response = await ds.client.get("/-/tables.json", skip_permission_checks=True)
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.json()
|
||||||
|
table_names = [match["name"] for match in data["matches"]]
|
||||||
|
# Should see fixtures tables when permission checks are skipped
|
||||||
|
assert "fixtures: test_table" in table_names
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue