Compare commits

..

4 commits

Author SHA1 Message Date
Simon Willison
92db0343c3 Updated release notes for 1.0a20
Refs #2550
2025-11-03 14:26:24 -08:00
Simon Willison
c0a87b809f Additional tests for restrict SQL
Refs https://github.com/simonw/datasette/issues/2572#issuecomment-3482778412
2025-11-03 14:07:03 -08:00
Simon Willison
161f2937cb facet_suggest_time_limit_ms 200ms in tests, closes #2574 2025-11-03 11:51:53 -08:00
Simon Willison
2fd98f4422 WIP restrictions SQL mechanism, refs #2572 2025-11-03 08:46:42 -08:00
54 changed files with 777 additions and 2764 deletions

View file

@ -2,9 +2,9 @@ name: Deploy latest.datasette.io
on:
workflow_dispatch:
push:
branches:
- main
# push:
# branches:
# - main
# - 1.0-dev
permissions:
@ -15,12 +15,19 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Check out datasette
uses: actions/checkout@v5
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v6
# Using Python 3.10 for gcloud compatibility:
with:
python-version: "3.13"
cache: pip
python-version: "3.10"
- uses: actions/cache@v4
name: Configure pip caching
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('**/pyproject.toml') }}
restore-keys: |
${{ runner.os }}-pip-
- name: Install Python dependencies
run: |
python -m pip install --upgrade pip
@ -95,13 +102,12 @@ jobs:
# jq '.plugins |= . + {"datasette-ephemeral-tables": {"table_ttl": 900}}' \
# > metadata.json
# cat metadata.json
- id: auth
name: Authenticate to Google Cloud
uses: google-github-actions/auth@v3
- name: Set up Cloud Run
uses: google-github-actions/setup-gcloud@v0
with:
credentials_json: ${{ secrets.GCP_SA_KEY }}
- name: Set up Cloud SDK
uses: google-github-actions/setup-gcloud@v3
version: '318.0.0'
service_account_email: ${{ secrets.GCP_SA_EMAIL }}
service_account_key: ${{ secrets.GCP_SA_KEY }}
- name: Deploy to Cloud Run
env:
LATEST_DATASETTE_SECRET: ${{ secrets.LATEST_DATASETTE_SECRET }}

View file

@ -73,13 +73,12 @@ jobs:
DISABLE_SPHINX_INLINE_TABS=1 sphinx-build -b xml . _build
sphinx-to-sqlite ../docs.db _build
cd ..
- id: auth
name: Authenticate to Google Cloud
uses: google-github-actions/auth@v2
- name: Set up Cloud Run
uses: google-github-actions/setup-gcloud@v0
with:
credentials_json: ${{ secrets.GCP_SA_KEY }}
- name: Set up Cloud SDK
uses: google-github-actions/setup-gcloud@v3
version: '318.0.0'
service_account_email: ${{ secrets.GCP_SA_EMAIL }}
service_account_key: ${{ secrets.GCP_SA_KEY }}
- name: Deploy stable-docs.datasette.io to Cloud Run
run: |-
gcloud config set run/region us-central1

View file

@ -1,76 +0,0 @@
name: Update Stable Docs
on:
release:
types: [published]
push:
branches:
- main
permissions:
contents: write
jobs:
update_stable_docs:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v5
with:
fetch-depth: 0 # We need all commits to find docs/ changes
- name: Set up Git user
run: |
git config user.name "Automated"
git config user.email "actions@users.noreply.github.com"
- name: Create stable branch if it does not yet exist
run: |
if ! git ls-remote --heads origin stable | grep -qE '\bstable\b'; then
# Make sure we have all tags locally
git fetch --tags --quiet
# Latest tag that is just numbers and dots (optionally prefixed with 'v')
# e.g., 0.65.2 or v0.65.2 — excludes 1.0a20, 1.0-rc1, etc.
LATEST_RELEASE=$(
git tag -l --sort=-v:refname \
| grep -E '^v?[0-9]+(\.[0-9]+){1,3}$' \
| head -n1
)
git checkout -b stable
# If there are any stable releases, copy docs/ from the most recent
if [ -n "$LATEST_RELEASE" ]; then
rm -rf docs/
git checkout "$LATEST_RELEASE" -- docs/ || true
fi
git commit -m "Populate docs/ from $LATEST_RELEASE" || echo "No changes"
git push -u origin stable
fi
- name: Handle Release
if: github.event_name == 'release' && !github.event.release.prerelease
run: |
git fetch --all
git checkout stable
git reset --hard ${GITHUB_REF#refs/tags/}
git push origin stable --force
- name: Handle Commit to Main
if: contains(github.event.head_commit.message, '!stable-docs')
run: |
git fetch origin
git checkout -b stable origin/stable
# Get the list of modified files in docs/ from the current commit
FILES=$(git diff-tree --no-commit-id --name-only -r ${{ github.sha }} -- docs/)
# Check if the list of files is non-empty
if [[ -n "$FILES" ]]; then
# Checkout those files to the stable branch to over-write with their contents
for FILE in $FILES; do
git checkout ${{ github.sha }} -- $FILE
done
git add docs/
git commit -m "Doc changes from ${{ github.sha }}"
git push origin stable
else
echo "No changes to docs/ in this commit."
exit 0
fi

View file

@ -21,15 +21,15 @@ export DATASETTE_SECRET := "not_a_secret"
@lint: codespell
uv run black . --check
uv run flake8
uv run --extra test cog --check README.md docs/*.rst
uv run cog --check README.md docs/*.rst
# Rebuild docs with cog
@cog:
uv run --extra test cog -r README.md docs/*.rst
uv run cog -r README.md docs/*.rst
# Serve live docs on localhost:8000
@docs: cog blacken-docs
uv run --extra docs make -C docs livehtml
uv sync --extra docs && cd docs && uv run make livehtml
# Build docs as static HTML
@docs-build: cog blacken-docs

View file

@ -2,7 +2,6 @@ from __future__ import annotations
from asgi_csrf import Errors
import asyncio
import contextvars
from typing import TYPE_CHECKING, Any, Dict, Iterable, List
if TYPE_CHECKING:
@ -59,9 +58,6 @@ from .views.special import (
PermissionRulesView,
PermissionCheckView,
TablesView,
InstanceSchemaView,
DatabaseSchemaView,
TableSchemaView,
)
from .views.table import (
TableInsertView,
@ -131,22 +127,6 @@ from .resources import DatabaseResource, TableResource
app_root = Path(__file__).parent.parent
# Context variable to track when code is executing within a datasette.client request
_in_datasette_client = contextvars.ContextVar("in_datasette_client", default=False)
class _DatasetteClientContext:
"""Context manager to mark code as executing within a datasette.client request."""
def __enter__(self):
self.token = _in_datasette_client.set(True)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
_in_datasette_client.reset(self.token)
return False
@dataclasses.dataclass
class PermissionCheck:
"""Represents a logged permission check for debugging purposes."""
@ -321,7 +301,6 @@ class Datasette:
crossdb=False,
nolock=False,
internal=None,
default_deny=False,
):
self._startup_invoked = False
assert config_dir is None or isinstance(
@ -530,7 +509,6 @@ class Datasette:
self._permission_checks = collections.deque(maxlen=200)
self._root_token = secrets.token_hex(32)
self.root_enabled = False
self.default_deny = default_deny
self.client = DatasetteClient(self)
async def apply_metadata_json(self):
@ -606,15 +584,6 @@ class Datasette:
"select database_name, schema_version from catalog_databases"
)
}
# Delete stale entries for databases that are no longer attached
stale_databases = set(current_schema_versions.keys()) - set(
self.databases.keys()
)
for stale_db_name in stale_databases:
await internal_db.execute_write(
"DELETE FROM catalog_databases WHERE database_name = ?",
[stale_db_name],
)
for database_name, db in self.databases.items():
schema_version = (await db.execute("PRAGMA schema_version")).first()[0]
# Compare schema versions to see if we should skip it
@ -640,17 +609,6 @@ class Datasette:
def urls(self):
return Urls(self)
@property
def pm(self):
"""
Return the global plugin manager instance.
This provides access to the pluggy PluginManager that manages all
Datasette plugins and hooks. Use datasette.pm.hook.hook_name() to
call plugin hooks.
"""
return pm
async def invoke_startup(self):
# This must be called for Datasette to be in a usable state
if self._startup_invoked:
@ -703,14 +661,6 @@ class Datasette:
def unsign(self, signed, namespace="default"):
return URLSafeSerializer(self._secret, namespace).loads(signed)
def in_client(self) -> bool:
"""Check if the current code is executing within a datasette.client request.
Returns:
bool: True if currently executing within a datasette.client request, False otherwise.
"""
return _in_datasette_client.get()
def create_token(
self,
actor_id: str,
@ -779,10 +729,8 @@ class Datasette:
self.databases = new_databases
return db
def add_memory_database(self, memory_name, name=None, route=None):
return self.add_database(
Database(self, memory_name=memory_name), name=name, route=route
)
def add_memory_database(self, memory_name):
return self.add_database(Database(self, memory_name=memory_name))
def remove_database(self, name):
self.get_database(name).close()
@ -1960,10 +1908,6 @@ class Datasette:
TablesView.as_view(self),
r"/-/tables(\.(?P<format>json))?$",
)
add_route(
InstanceSchemaView.as_view(self),
r"/-/schema(\.(?P<format>json|md))?$",
)
add_route(
LogoutView.as_view(self),
r"/-/logout$",
@ -2005,10 +1949,6 @@ class Datasette:
r"/(?P<database>[^\/\.]+)(\.(?P<format>\w+))?$",
)
add_route(TableCreateView.as_view(self), r"/(?P<database>[^\/\.]+)/-/create$")
add_route(
DatabaseSchemaView.as_view(self),
r"/(?P<database>[^\/\.]+)/-/schema(\.(?P<format>json|md))?$",
)
add_route(
wrap_view(QueryView, self),
r"/(?P<database>[^\/\.]+)/-/query(\.(?P<format>\w+))?$",
@ -2033,10 +1973,6 @@ class Datasette:
TableDropView.as_view(self),
r"/(?P<database>[^\/\.]+)/(?P<table>[^\/\.]+)/-/drop$",
)
add_route(
TableSchemaView.as_view(self),
r"/(?P<database>[^\/\.]+)/(?P<table>[^\/\.]+)/-/schema(\.(?P<format>json|md))?$",
)
add_route(
RowDeleteView.as_view(self),
r"/(?P<database>[^\/\.]+)/(?P<table>[^/]+?)/(?P<pks>[^/]+?)/-/delete$",
@ -2427,18 +2363,9 @@ class NotFoundExplicit(NotFound):
class DatasetteClient:
"""Internal HTTP client for making requests to a Datasette instance.
Used for testing and for internal operations that need to make HTTP requests
to the Datasette app without going through an actual HTTP server.
"""
def __init__(self, ds):
self.ds = ds
@property
def app(self):
return self.ds.app()
self.app = ds.app()
def actor_cookie(self, actor):
# Utility method, mainly for tests
@ -2451,85 +2378,36 @@ class DatasetteClient:
path = f"http://localhost{path}"
return path
async def _request(self, method, path, skip_permission_checks=False, **kwargs):
from datasette.permissions import SkipPermissions
with _DatasetteClientContext():
if skip_permission_checks:
with SkipPermissions():
async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=self.app),
cookies=kwargs.pop("cookies", None),
) as client:
return await getattr(client, method)(self._fix(path), **kwargs)
else:
async def _request(self, method, path, **kwargs):
async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=self.app),
cookies=kwargs.pop("cookies", None),
) as client:
return await getattr(client, method)(self._fix(path), **kwargs)
async def get(self, path, skip_permission_checks=False, **kwargs):
return await self._request(
"get", path, skip_permission_checks=skip_permission_checks, **kwargs
)
async def get(self, path, **kwargs):
return await self._request("get", path, **kwargs)
async def options(self, path, skip_permission_checks=False, **kwargs):
return await self._request(
"options", path, skip_permission_checks=skip_permission_checks, **kwargs
)
async def options(self, path, **kwargs):
return await self._request("options", path, **kwargs)
async def head(self, path, skip_permission_checks=False, **kwargs):
return await self._request(
"head", path, skip_permission_checks=skip_permission_checks, **kwargs
)
async def head(self, path, **kwargs):
return await self._request("head", path, **kwargs)
async def post(self, path, skip_permission_checks=False, **kwargs):
return await self._request(
"post", path, skip_permission_checks=skip_permission_checks, **kwargs
)
async def post(self, path, **kwargs):
return await self._request("post", path, **kwargs)
async def put(self, path, skip_permission_checks=False, **kwargs):
return await self._request(
"put", path, skip_permission_checks=skip_permission_checks, **kwargs
)
async def put(self, path, **kwargs):
return await self._request("put", path, **kwargs)
async def patch(self, path, skip_permission_checks=False, **kwargs):
return await self._request(
"patch", path, skip_permission_checks=skip_permission_checks, **kwargs
)
async def patch(self, path, **kwargs):
return await self._request("patch", path, **kwargs)
async def delete(self, path, skip_permission_checks=False, **kwargs):
return await self._request(
"delete", path, skip_permission_checks=skip_permission_checks, **kwargs
)
async def request(self, method, path, skip_permission_checks=False, **kwargs):
"""Make an HTTP request with the specified method.
Args:
method: HTTP method (e.g., "GET", "POST", "PUT")
path: The path to request
skip_permission_checks: If True, bypass all permission checks for this request
**kwargs: Additional arguments to pass to httpx
Returns:
httpx.Response: The response from the request
"""
from datasette.permissions import SkipPermissions
async def delete(self, path, **kwargs):
return await self._request("delete", path, **kwargs)
async def request(self, method, path, **kwargs):
avoid_path_rewrites = kwargs.pop("avoid_path_rewrites", None)
with _DatasetteClientContext():
if skip_permission_checks:
with SkipPermissions():
async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=self.app),
cookies=kwargs.pop("cookies", None),
) as client:
return await client.request(
method, self._fix(path, avoid_path_rewrites), **kwargs
)
else:
async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=self.app),
cookies=kwargs.pop("cookies", None),

View file

@ -438,20 +438,10 @@ def uninstall(packages, yes):
help="Output URL that sets a cookie authenticating the root user",
is_flag=True,
)
@click.option(
"--default-deny",
help="Deny all permissions by default",
is_flag=True,
)
@click.option(
"--get",
help="Run an HTTP GET request against this path, print results and exit",
)
@click.option(
"--headers",
is_flag=True,
help="Include HTTP headers in --get output",
)
@click.option(
"--token",
help="API token to send with --get requests",
@ -519,9 +509,7 @@ def serve(
settings,
secret,
root,
default_deny,
get,
headers,
token,
actor,
version_note,
@ -600,7 +588,6 @@ def serve(
crossdb=crossdb,
nolock=nolock,
internal=internal,
default_deny=default_deny,
)
# Separate directories from files
@ -671,33 +658,19 @@ def serve(
# Run async soundness checks - but only if we're not under pytest
run_sync(lambda: check_databases(ds))
if headers and not get:
raise click.ClickException("--headers can only be used with --get")
if token and not get:
raise click.ClickException("--token can only be used with --get")
if get:
client = TestClient(ds)
request_headers = {}
headers = {}
if token:
request_headers["Authorization"] = "Bearer {}".format(token)
headers["Authorization"] = "Bearer {}".format(token)
cookies = {}
if actor:
cookies["ds_actor"] = client.actor_cookie(json.loads(actor))
response = client.get(get, headers=request_headers, cookies=cookies)
if headers:
# Output HTTP status code, headers, two newlines, then the response body
click.echo(f"HTTP/1.1 {response.status}")
for key, value in response.headers.items():
click.echo(f"{key}: {value}")
if response.text:
click.echo()
response = client.get(get, headers=headers, cookies=cookies)
click.echo(response.text)
else:
click.echo(response.text)
exit_code = 0 if response.status == 200 else 1
sys.exit(exit_code)
return

View file

@ -0,0 +1,490 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from datasette.app import Datasette
from datasette import hookimpl
from datasette.permissions import PermissionSQL
from datasette.utils import actor_matches_allow
import itsdangerous
import time
@hookimpl(specname="permission_resources_sql")
async def actor_restrictions_sql(datasette, actor, action):
"""Handle actor restriction-based permission rules (_r key)."""
if not actor:
return None
restrictions = actor.get("_r") if isinstance(actor, dict) else None
if restrictions is None:
return []
# Check if this action appears in restrictions (with abbreviations)
action_obj = datasette.actions.get(action)
action_checks = {action}
if action_obj and action_obj.abbr:
action_checks.add(action_obj.abbr)
# Check if globally allowed in restrictions
global_actions = restrictions.get("a", [])
is_globally_allowed = action_checks.intersection(global_actions)
if is_globally_allowed:
# Globally allowed - no restriction filtering needed
return []
# Not globally allowed - build restriction_sql that lists allowlisted resources
restriction_selects = []
restriction_params = {}
param_counter = 0
# Add database-level allowlisted resources
db_restrictions = restrictions.get("d", {})
for db_name, db_actions in db_restrictions.items():
if action_checks.intersection(db_actions):
prefix = f"restr_{param_counter}"
param_counter += 1
restriction_selects.append(
f"SELECT :{prefix}_parent AS parent, NULL AS child"
)
restriction_params[f"{prefix}_parent"] = db_name
# Add table-level allowlisted resources
resource_restrictions = restrictions.get("r", {})
for db_name, tables in resource_restrictions.items():
for table_name, table_actions in tables.items():
if action_checks.intersection(table_actions):
prefix = f"restr_{param_counter}"
param_counter += 1
restriction_selects.append(
f"SELECT :{prefix}_parent AS parent, :{prefix}_child AS child"
)
restriction_params[f"{prefix}_parent"] = db_name
restriction_params[f"{prefix}_child"] = table_name
if not restriction_selects:
# Action not in allowlist - return empty restriction (INTERSECT will return no results)
return [
PermissionSQL(
params={"deny": f"actor restrictions: {action} not in allowlist"},
restriction_sql="SELECT NULL AS parent, NULL AS child WHERE 0", # Empty set
)
]
# Build restriction SQL that returns allowed (parent, child) pairs
restriction_sql = "\nUNION ALL\n".join(restriction_selects)
# Return restriction-only PermissionSQL (sql=None means no permission rules)
# The restriction_sql does the actual filtering via INTERSECT
return [
PermissionSQL(
params=restriction_params,
restriction_sql=restriction_sql,
)
]
@hookimpl(specname="permission_resources_sql")
async def root_user_permissions_sql(datasette, actor, action):
"""Grant root user full permissions when enabled."""
if datasette.root_enabled and actor and actor.get("id") == "root":
# Add a single global-level allow rule (NULL, NULL) for root
# This allows root to access everything by default, but database-level
# and table-level deny rules in config can still block specific resources
return PermissionSQL.allow(reason="root user")
return None
@hookimpl(specname="permission_resources_sql")
async def config_permissions_sql(datasette, actor, action):
"""Apply config-based permission rules from datasette.yaml."""
config = datasette.config or {}
def evaluate(allow_block):
if allow_block is None:
return None
return actor_matches_allow(actor, allow_block)
has_restrictions = actor and "_r" in actor if actor else False
restrictions = actor.get("_r", {}) if actor else {}
action_obj = datasette.actions.get(action)
action_checks = {action}
if action_obj and action_obj.abbr:
action_checks.add(action_obj.abbr)
restricted_databases: set[str] = set()
restricted_tables: set[tuple[str, str]] = set()
if has_restrictions:
restricted_databases = {
db_name
for db_name, db_actions in (restrictions.get("d") or {}).items()
if action_checks.intersection(db_actions)
}
restricted_tables = {
(db_name, table_name)
for db_name, tables in (restrictions.get("r") or {}).items()
for table_name, table_actions in tables.items()
if action_checks.intersection(table_actions)
}
# Tables implicitly reference their parent databases
restricted_databases.update(db for db, _ in restricted_tables)
def is_in_restriction_allowlist(parent, child, action_name):
"""Check if a resource is in the actor's restriction allowlist for this action"""
if not has_restrictions:
return True # No restrictions, all resources allowed
# Check global allowlist
if action_checks.intersection(restrictions.get("a", [])):
return True
# Check database-level allowlist
if parent and action_checks.intersection(
restrictions.get("d", {}).get(parent, [])
):
return True
# Check table-level allowlist
if parent:
table_restrictions = (restrictions.get("r", {}) or {}).get(parent, {})
if child:
table_actions = table_restrictions.get(child, [])
if action_checks.intersection(table_actions):
return True
else:
# Parent query should proceed if any child in this database is allowlisted
for table_actions in table_restrictions.values():
if action_checks.intersection(table_actions):
return True
# Parent/child both None: include if any restrictions exist for this action
if parent is None and child is None:
if action_checks.intersection(restrictions.get("a", [])):
return True
if restricted_databases:
return True
if restricted_tables:
return True
return False
rows = []
def add_row(parent, child, result, scope):
if result is None:
return
rows.append(
(
parent,
child,
bool(result),
f"config {'allow' if result else 'deny'} {scope}",
)
)
def add_row_allow_block(parent, child, allow_block, scope):
"""For 'allow' blocks, always add a row if the block exists - deny if no match"""
if allow_block is None:
return
# If actor has restrictions and this resource is NOT in allowlist, skip this config rule
# Restrictions act as a gating filter - config cannot grant access to restricted-out resources
if not is_in_restriction_allowlist(parent, child, action):
return
result = evaluate(allow_block)
bool_result = bool(result)
# If result is None (no match) or False, treat as deny
rows.append(
(
parent,
child,
bool_result, # None becomes False, False stays False, True stays True
f"config {'allow' if result else 'deny'} {scope}",
)
)
if has_restrictions and not bool_result and child is None:
reason = f"config deny {scope} (restriction gate)"
if parent is None:
# Root-level deny: add more specific denies for restricted resources
if action_obj and action_obj.takes_parent:
for db_name in restricted_databases:
rows.append((db_name, None, 0, reason))
if action_obj and action_obj.takes_child:
for db_name, table_name in restricted_tables:
rows.append((db_name, table_name, 0, reason))
else:
# Database-level deny: add child-level denies for restricted tables
if action_obj and action_obj.takes_child:
for db_name, table_name in restricted_tables:
if db_name == parent:
rows.append((db_name, table_name, 0, reason))
root_perm = (config.get("permissions") or {}).get(action)
add_row(None, None, evaluate(root_perm), f"permissions for {action}")
for db_name, db_config in (config.get("databases") or {}).items():
db_perm = (db_config.get("permissions") or {}).get(action)
add_row(
db_name, None, evaluate(db_perm), f"permissions for {action} on {db_name}"
)
for table_name, table_config in (db_config.get("tables") or {}).items():
table_perm = (table_config.get("permissions") or {}).get(action)
add_row(
db_name,
table_name,
evaluate(table_perm),
f"permissions for {action} on {db_name}/{table_name}",
)
if action == "view-table":
table_allow = (table_config or {}).get("allow")
add_row_allow_block(
db_name,
table_name,
table_allow,
f"allow for {action} on {db_name}/{table_name}",
)
for query_name, query_config in (db_config.get("queries") or {}).items():
# query_config can be a string (just SQL) or a dict (with SQL and options)
if isinstance(query_config, dict):
query_perm = (query_config.get("permissions") or {}).get(action)
add_row(
db_name,
query_name,
evaluate(query_perm),
f"permissions for {action} on {db_name}/{query_name}",
)
if action == "view-query":
query_allow = query_config.get("allow")
add_row_allow_block(
db_name,
query_name,
query_allow,
f"allow for {action} on {db_name}/{query_name}",
)
if action == "view-database":
db_allow = db_config.get("allow")
add_row_allow_block(
db_name, None, db_allow, f"allow for {action} on {db_name}"
)
if action == "execute-sql":
db_allow_sql = db_config.get("allow_sql")
add_row_allow_block(db_name, None, db_allow_sql, f"allow_sql for {db_name}")
if action == "view-table":
# Database-level allow block affects all tables in that database
db_allow = db_config.get("allow")
add_row_allow_block(
db_name, None, db_allow, f"allow for {action} on {db_name}"
)
if action == "view-query":
# Database-level allow block affects all queries in that database
db_allow = db_config.get("allow")
add_row_allow_block(
db_name, None, db_allow, f"allow for {action} on {db_name}"
)
# Root-level allow block applies to all view-* actions
if action == "view-instance":
allow_block = config.get("allow")
add_row_allow_block(None, None, allow_block, "allow for view-instance")
if action == "view-database":
# Root-level allow block also applies to view-database
allow_block = config.get("allow")
add_row_allow_block(None, None, allow_block, "allow for view-database")
if action == "view-table":
# Root-level allow block also applies to view-table
allow_block = config.get("allow")
add_row_allow_block(None, None, allow_block, "allow for view-table")
if action == "view-query":
# Root-level allow block also applies to view-query
allow_block = config.get("allow")
add_row_allow_block(None, None, allow_block, "allow for view-query")
if action == "execute-sql":
allow_sql = config.get("allow_sql")
add_row_allow_block(None, None, allow_sql, "allow_sql")
if not rows:
return []
parts = []
params = {}
for idx, (parent, child, allow, reason) in enumerate(rows):
key = f"cfg_{idx}"
parts.append(
f"SELECT :{key}_parent AS parent, :{key}_child AS child, :{key}_allow AS allow, :{key}_reason AS reason"
)
params[f"{key}_parent"] = parent
params[f"{key}_child"] = child
params[f"{key}_allow"] = 1 if allow else 0
params[f"{key}_reason"] = reason
sql = "\nUNION ALL\n".join(parts)
return [PermissionSQL(sql=sql, params=params)]
@hookimpl(specname="permission_resources_sql")
async def default_allow_sql_check(datasette, actor, action):
"""Enforce default_allow_sql setting for execute-sql action."""
if action == "execute-sql" and not datasette.setting("default_allow_sql"):
return PermissionSQL.deny(reason="default_allow_sql is false")
return None
@hookimpl(specname="permission_resources_sql")
async def default_action_permissions_sql(datasette, actor, action):
"""Apply default allow rules for standard view/execute actions.
With the INTERSECT-based restriction approach, these defaults are always generated
and then filtered by restriction_sql if the actor has restrictions.
"""
default_allow_actions = {
"view-instance",
"view-database",
"view-database-download",
"view-table",
"view-query",
"execute-sql",
}
if action in default_allow_actions:
reason = f"default allow for {action}".replace("'", "''")
return PermissionSQL.allow(reason=reason)
return None
def restrictions_allow_action(
datasette: "Datasette",
restrictions: dict,
action: str,
resource: str | tuple[str, str],
):
"""
Check if actor restrictions allow the requested action against the requested resource.
Restrictions work on an exact-match basis: if an actor has view-table permission,
they can view tables, but NOT automatically view-instance or view-database.
Each permission is checked independently without implication logic.
"""
# Does this action have an abbreviation?
to_check = {action}
action_obj = datasette.actions.get(action)
if action_obj and action_obj.abbr:
to_check.add(action_obj.abbr)
# Check if restrictions explicitly allow this action
# Restrictions can be at three levels:
# - "a": global (any resource)
# - "d": per-database
# - "r": per-table/resource
# Check global level (any resource)
all_allowed = restrictions.get("a")
if all_allowed is not None:
assert isinstance(all_allowed, list)
if to_check.intersection(all_allowed):
return True
# Check database level
if resource:
if isinstance(resource, str):
database_name = resource
else:
database_name = resource[0]
database_allowed = restrictions.get("d", {}).get(database_name)
if database_allowed is not None:
assert isinstance(database_allowed, list)
if to_check.intersection(database_allowed):
return True
# Check table/resource level
if resource is not None and not isinstance(resource, str) and len(resource) == 2:
database, table = resource
table_allowed = restrictions.get("r", {}).get(database, {}).get(table)
if table_allowed is not None:
assert isinstance(table_allowed, list)
if to_check.intersection(table_allowed):
return True
# This action is not explicitly allowed, so reject it
return False
@hookimpl
def actor_from_request(datasette, request):
prefix = "dstok_"
if not datasette.setting("allow_signed_tokens"):
return None
max_signed_tokens_ttl = datasette.setting("max_signed_tokens_ttl")
authorization = request.headers.get("authorization")
if not authorization:
return None
if not authorization.startswith("Bearer "):
return None
token = authorization[len("Bearer ") :]
if not token.startswith(prefix):
return None
token = token[len(prefix) :]
try:
decoded = datasette.unsign(token, namespace="token")
except itsdangerous.BadSignature:
return None
if "t" not in decoded:
# Missing timestamp
return None
created = decoded["t"]
if not isinstance(created, int):
# Invalid timestamp
return None
duration = decoded.get("d")
if duration is not None and not isinstance(duration, int):
# Invalid duration
return None
if (duration is None and max_signed_tokens_ttl) or (
duration is not None
and max_signed_tokens_ttl
and duration > max_signed_tokens_ttl
):
duration = max_signed_tokens_ttl
if duration:
if time.time() - created > duration:
# Expired
return None
actor = {"id": decoded["a"], "token": "dstok"}
if "_r" in decoded:
actor["_r"] = decoded["_r"]
if duration:
actor["token_expires"] = created + duration
return actor
@hookimpl
def skip_csrf(scope):
# Skip CSRF check for requests with content-type: application/json
if scope["type"] == "http":
headers = scope.get("headers") or {}
if dict(headers).get(b"content-type") == b"application/json":
return True
@hookimpl
def canned_queries(datasette, database, actor):
"""Return canned queries from datasette configuration."""
queries = (
((datasette.config or {}).get("databases") or {}).get(database) or {}
).get("queries") or {}
return queries

View file

@ -1,59 +0,0 @@
"""
Default permission implementations for Datasette.
This module provides the built-in permission checking logic through implementations
of the permission_resources_sql hook. The hooks are organized by their purpose:
1. Actor Restrictions - Enforces _r allowlists embedded in actor tokens
2. Root User - Grants full access when --root flag is used
3. Config Rules - Applies permissions from datasette.yaml
4. Default Settings - Enforces default_allow_sql and default view permissions
IMPORTANT: These hooks return PermissionSQL objects that are combined using SQL
UNION/INTERSECT operations. The order of evaluation is:
- restriction_sql fields are INTERSECTed (all must match)
- Regular sql fields are UNIONed and evaluated with cascading priority
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from datasette.app import Datasette
from datasette import hookimpl
# Re-export all hooks and public utilities
from .restrictions import (
actor_restrictions_sql,
restrictions_allow_action,
ActorRestrictions,
)
from .root import root_user_permissions_sql
from .config import config_permissions_sql
from .defaults import (
default_allow_sql_check,
default_action_permissions_sql,
DEFAULT_ALLOW_ACTIONS,
)
from .tokens import actor_from_signed_api_token
@hookimpl
def skip_csrf(scope) -> Optional[bool]:
"""Skip CSRF check for JSON content-type requests."""
if scope["type"] == "http":
headers = scope.get("headers") or {}
if dict(headers).get(b"content-type") == b"application/json":
return True
return None
@hookimpl
def canned_queries(datasette: "Datasette", database: str, actor) -> dict:
"""Return canned queries defined in datasette.yaml configuration."""
queries = (
((datasette.config or {}).get("databases") or {}).get(database) or {}
).get("queries") or {}
return queries

View file

@ -1,442 +0,0 @@
"""
Config-based permission handling for Datasette.
Applies permission rules from datasette.yaml configuration.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, List, Optional, Set, Tuple
if TYPE_CHECKING:
from datasette.app import Datasette
from datasette import hookimpl
from datasette.permissions import PermissionSQL
from datasette.utils import actor_matches_allow
from .helpers import PermissionRowCollector, get_action_name_variants
class ConfigPermissionProcessor:
"""
Processes permission rules from datasette.yaml configuration.
Configuration structure:
permissions: # Root-level permissions block
view-instance:
id: admin
databases:
mydb:
permissions: # Database-level permissions
view-database:
id: admin
allow: # Database-level allow block (for view-*)
id: viewer
allow_sql: # execute-sql allow block
id: analyst
tables:
users:
permissions: # Table-level permissions
view-table:
id: admin
allow: # Table-level allow block
id: viewer
queries:
my_query:
permissions: # Query-level permissions
view-query:
id: admin
allow: # Query-level allow block
id: viewer
"""
def __init__(
self,
datasette: "Datasette",
actor: Optional[dict],
action: str,
):
self.datasette = datasette
self.actor = actor
self.action = action
self.config = datasette.config or {}
self.collector = PermissionRowCollector(prefix="cfg")
# Pre-compute action variants
self.action_checks = get_action_name_variants(datasette, action)
self.action_obj = datasette.actions.get(action)
# Parse restrictions if present
self.has_restrictions = actor and "_r" in actor if actor else False
self.restrictions = actor.get("_r", {}) if actor else {}
# Pre-compute restriction info for efficiency
self.restricted_databases: Set[str] = set()
self.restricted_tables: Set[Tuple[str, str]] = set()
if self.has_restrictions:
self.restricted_databases = {
db_name
for db_name, db_actions in (self.restrictions.get("d") or {}).items()
if self.action_checks.intersection(db_actions)
}
self.restricted_tables = {
(db_name, table_name)
for db_name, tables in (self.restrictions.get("r") or {}).items()
for table_name, table_actions in tables.items()
if self.action_checks.intersection(table_actions)
}
# Tables implicitly reference their parent databases
self.restricted_databases.update(db for db, _ in self.restricted_tables)
def evaluate_allow_block(self, allow_block: Any) -> Optional[bool]:
"""Evaluate an allow block against the current actor."""
if allow_block is None:
return None
return actor_matches_allow(self.actor, allow_block)
def is_in_restriction_allowlist(
self,
parent: Optional[str],
child: Optional[str],
) -> bool:
"""Check if resource is allowed by actor restrictions."""
if not self.has_restrictions:
return True # No restrictions, all resources allowed
# Check global allowlist
if self.action_checks.intersection(self.restrictions.get("a", [])):
return True
# Check database-level allowlist
if parent and self.action_checks.intersection(
self.restrictions.get("d", {}).get(parent, [])
):
return True
# Check table-level allowlist
if parent:
table_restrictions = (self.restrictions.get("r", {}) or {}).get(parent, {})
if child:
table_actions = table_restrictions.get(child, [])
if self.action_checks.intersection(table_actions):
return True
else:
# Parent query should proceed if any child in this database is allowlisted
for table_actions in table_restrictions.values():
if self.action_checks.intersection(table_actions):
return True
# Parent/child both None: include if any restrictions exist for this action
if parent is None and child is None:
if self.action_checks.intersection(self.restrictions.get("a", [])):
return True
if self.restricted_databases:
return True
if self.restricted_tables:
return True
return False
def add_permissions_rule(
self,
parent: Optional[str],
child: Optional[str],
permissions_block: Optional[dict],
scope_desc: str,
) -> None:
"""Add a rule from a permissions:{action} block."""
if permissions_block is None:
return
action_allow_block = permissions_block.get(self.action)
result = self.evaluate_allow_block(action_allow_block)
self.collector.add(
parent=parent,
child=child,
allow=result,
reason=f"config {'allow' if result else 'deny'} {scope_desc}",
if_not_none=True,
)
def add_allow_block_rule(
self,
parent: Optional[str],
child: Optional[str],
allow_block: Any,
scope_desc: str,
) -> None:
"""
Add rules from an allow:{} block.
For allow blocks, if the block exists but doesn't match the actor,
this is treated as a deny. We also handle the restriction-gate logic.
"""
if allow_block is None:
return
# Skip if resource is not in restriction allowlist
if not self.is_in_restriction_allowlist(parent, child):
return
result = self.evaluate_allow_block(allow_block)
bool_result = bool(result)
self.collector.add(
parent,
child,
bool_result,
f"config {'allow' if result else 'deny'} {scope_desc}",
)
# Handle restriction-gate: add explicit denies for restricted resources
self._add_restriction_gate_denies(parent, child, bool_result, scope_desc)
def _add_restriction_gate_denies(
self,
parent: Optional[str],
child: Optional[str],
is_allowed: bool,
scope_desc: str,
) -> None:
"""
When a config rule denies at a higher level, add explicit denies
for restricted resources to prevent child-level allows from
incorrectly granting access.
"""
if is_allowed or child is not None or not self.has_restrictions:
return
if not self.action_obj:
return
reason = f"config deny {scope_desc} (restriction gate)"
if parent is None:
# Root-level deny: add denies for all restricted resources
if self.action_obj.takes_parent:
for db_name in self.restricted_databases:
self.collector.add(db_name, None, False, reason)
if self.action_obj.takes_child:
for db_name, table_name in self.restricted_tables:
self.collector.add(db_name, table_name, False, reason)
else:
# Database-level deny: add denies for tables in that database
if self.action_obj.takes_child:
for db_name, table_name in self.restricted_tables:
if db_name == parent:
self.collector.add(db_name, table_name, False, reason)
def process(self) -> Optional[PermissionSQL]:
"""Process all config rules and return combined PermissionSQL."""
self._process_root_permissions()
self._process_databases()
self._process_root_allow_blocks()
return self.collector.to_permission_sql()
def _process_root_permissions(self) -> None:
"""Process root-level permissions block."""
root_perms = self.config.get("permissions") or {}
self.add_permissions_rule(
None,
None,
root_perms,
f"permissions for {self.action}",
)
def _process_databases(self) -> None:
"""Process database-level and nested configurations."""
databases = self.config.get("databases") or {}
for db_name, db_config in databases.items():
self._process_database(db_name, db_config or {})
def _process_database(self, db_name: str, db_config: dict) -> None:
"""Process a single database's configuration."""
# Database-level permissions block
db_perms = db_config.get("permissions") or {}
self.add_permissions_rule(
db_name,
None,
db_perms,
f"permissions for {self.action} on {db_name}",
)
# Process tables
for table_name, table_config in (db_config.get("tables") or {}).items():
self._process_table(db_name, table_name, table_config or {})
# Process queries
for query_name, query_config in (db_config.get("queries") or {}).items():
self._process_query(db_name, query_name, query_config)
# Database-level allow blocks
self._process_database_allow_blocks(db_name, db_config)
def _process_table(
self,
db_name: str,
table_name: str,
table_config: dict,
) -> None:
"""Process a single table's configuration."""
# Table-level permissions block
table_perms = table_config.get("permissions") or {}
self.add_permissions_rule(
db_name,
table_name,
table_perms,
f"permissions for {self.action} on {db_name}/{table_name}",
)
# Table-level allow block (for view-table)
if self.action == "view-table":
self.add_allow_block_rule(
db_name,
table_name,
table_config.get("allow"),
f"allow for {self.action} on {db_name}/{table_name}",
)
def _process_query(
self,
db_name: str,
query_name: str,
query_config: Any,
) -> None:
"""Process a single query's configuration."""
# Query config can be a string (just SQL) or dict
if not isinstance(query_config, dict):
return
# Query-level permissions block
query_perms = query_config.get("permissions") or {}
self.add_permissions_rule(
db_name,
query_name,
query_perms,
f"permissions for {self.action} on {db_name}/{query_name}",
)
# Query-level allow block (for view-query)
if self.action == "view-query":
self.add_allow_block_rule(
db_name,
query_name,
query_config.get("allow"),
f"allow for {self.action} on {db_name}/{query_name}",
)
def _process_database_allow_blocks(
self,
db_name: str,
db_config: dict,
) -> None:
"""Process database-level allow/allow_sql blocks."""
# view-database allow block
if self.action == "view-database":
self.add_allow_block_rule(
db_name,
None,
db_config.get("allow"),
f"allow for {self.action} on {db_name}",
)
# execute-sql allow_sql block
if self.action == "execute-sql":
self.add_allow_block_rule(
db_name,
None,
db_config.get("allow_sql"),
f"allow_sql for {db_name}",
)
# view-table uses database-level allow for inheritance
if self.action == "view-table":
self.add_allow_block_rule(
db_name,
None,
db_config.get("allow"),
f"allow for {self.action} on {db_name}",
)
# view-query uses database-level allow for inheritance
if self.action == "view-query":
self.add_allow_block_rule(
db_name,
None,
db_config.get("allow"),
f"allow for {self.action} on {db_name}",
)
def _process_root_allow_blocks(self) -> None:
"""Process root-level allow/allow_sql blocks."""
root_allow = self.config.get("allow")
if self.action == "view-instance":
self.add_allow_block_rule(
None,
None,
root_allow,
"allow for view-instance",
)
if self.action == "view-database":
self.add_allow_block_rule(
None,
None,
root_allow,
"allow for view-database",
)
if self.action == "view-table":
self.add_allow_block_rule(
None,
None,
root_allow,
"allow for view-table",
)
if self.action == "view-query":
self.add_allow_block_rule(
None,
None,
root_allow,
"allow for view-query",
)
if self.action == "execute-sql":
self.add_allow_block_rule(
None,
None,
self.config.get("allow_sql"),
"allow_sql",
)
@hookimpl(specname="permission_resources_sql")
async def config_permissions_sql(
datasette: "Datasette",
actor: Optional[dict],
action: str,
) -> Optional[List[PermissionSQL]]:
"""
Apply permission rules from datasette.yaml configuration.
This processes:
- permissions: blocks at root, database, table, and query levels
- allow: blocks for view-* actions
- allow_sql: blocks for execute-sql action
"""
processor = ConfigPermissionProcessor(datasette, actor, action)
result = processor.process()
if result is None:
return []
return [result]

View file

@ -1,70 +0,0 @@
"""
Default permission settings for Datasette.
Provides default allow rules for standard view/execute actions.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from datasette.app import Datasette
from datasette import hookimpl
from datasette.permissions import PermissionSQL
# Actions that are allowed by default (unless --default-deny is used)
DEFAULT_ALLOW_ACTIONS = frozenset(
{
"view-instance",
"view-database",
"view-database-download",
"view-table",
"view-query",
"execute-sql",
}
)
@hookimpl(specname="permission_resources_sql")
async def default_allow_sql_check(
datasette: "Datasette",
actor: Optional[dict],
action: str,
) -> Optional[PermissionSQL]:
"""
Enforce the default_allow_sql setting.
When default_allow_sql is false (the default), execute-sql is denied
unless explicitly allowed by config or other rules.
"""
if action == "execute-sql":
if not datasette.setting("default_allow_sql"):
return PermissionSQL.deny(reason="default_allow_sql is false")
return None
@hookimpl(specname="permission_resources_sql")
async def default_action_permissions_sql(
datasette: "Datasette",
actor: Optional[dict],
action: str,
) -> Optional[PermissionSQL]:
"""
Provide default allow rules for standard view/execute actions.
These defaults are skipped when datasette is started with --default-deny.
The restriction_sql mechanism (from actor_restrictions_sql) will still
filter these results if the actor has restrictions.
"""
if datasette.default_deny:
return None
if action in DEFAULT_ALLOW_ACTIONS:
reason = f"default allow for {action}".replace("'", "''")
return PermissionSQL.allow(reason=reason)
return None

View file

@ -1,85 +0,0 @@
"""
Shared helper utilities for default permission implementations.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, List, Optional, Set
if TYPE_CHECKING:
from datasette.app import Datasette
from datasette.permissions import PermissionSQL
def get_action_name_variants(datasette: "Datasette", action: str) -> Set[str]:
"""
Get all name variants for an action (full name and abbreviation).
Example:
get_action_name_variants(ds, "view-table") -> {"view-table", "vt"}
"""
variants = {action}
action_obj = datasette.actions.get(action)
if action_obj and action_obj.abbr:
variants.add(action_obj.abbr)
return variants
def action_in_list(datasette: "Datasette", action: str, action_list: list) -> bool:
"""Check if an action (or its abbreviation) is in a list."""
return bool(get_action_name_variants(datasette, action).intersection(action_list))
@dataclass
class PermissionRow:
"""A single permission rule row."""
parent: Optional[str]
child: Optional[str]
allow: bool
reason: str
class PermissionRowCollector:
"""Collects permission rows and converts them to PermissionSQL."""
def __init__(self, prefix: str = "row"):
self.rows: List[PermissionRow] = []
self.prefix = prefix
def add(
self,
parent: Optional[str],
child: Optional[str],
allow: Optional[bool],
reason: str,
if_not_none: bool = False,
) -> None:
"""Add a permission row. If if_not_none=True, only add if allow is not None."""
if if_not_none and allow is None:
return
self.rows.append(PermissionRow(parent, child, allow, reason))
def to_permission_sql(self) -> Optional[PermissionSQL]:
"""Convert collected rows to a PermissionSQL object."""
if not self.rows:
return None
parts = []
params = {}
for idx, row in enumerate(self.rows):
key = f"{self.prefix}_{idx}"
parts.append(
f"SELECT :{key}_parent AS parent, :{key}_child AS child, "
f":{key}_allow AS allow, :{key}_reason AS reason"
)
params[f"{key}_parent"] = row.parent
params[f"{key}_child"] = row.child
params[f"{key}_allow"] = 1 if row.allow else 0
params[f"{key}_reason"] = row.reason
sql = "\nUNION ALL\n".join(parts)
return PermissionSQL(sql=sql, params=params)

View file

@ -1,195 +0,0 @@
"""
Actor restriction handling for Datasette permissions.
This module handles the _r (restrictions) key in actor dictionaries, which
contains allowlists of resources the actor can access.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, List, Optional, Set, Tuple
if TYPE_CHECKING:
from datasette.app import Datasette
from datasette import hookimpl
from datasette.permissions import PermissionSQL
from .helpers import action_in_list, get_action_name_variants
@dataclass
class ActorRestrictions:
"""Parsed actor restrictions from the _r key."""
global_actions: List[str] # _r.a - globally allowed actions
database_actions: dict # _r.d - {db_name: [actions]}
table_actions: dict # _r.r - {db_name: {table: [actions]}}
@classmethod
def from_actor(cls, actor: Optional[dict]) -> Optional["ActorRestrictions"]:
"""Parse restrictions from actor dict. Returns None if no restrictions."""
if not actor:
return None
assert isinstance(actor, dict), "actor must be a dictionary"
restrictions = actor.get("_r")
if restrictions is None:
return None
return cls(
global_actions=restrictions.get("a", []),
database_actions=restrictions.get("d", {}),
table_actions=restrictions.get("r", {}),
)
def is_action_globally_allowed(self, datasette: "Datasette", action: str) -> bool:
"""Check if action is in the global allowlist."""
return action_in_list(datasette, action, self.global_actions)
def get_allowed_databases(self, datasette: "Datasette", action: str) -> Set[str]:
"""Get database names where this action is allowed."""
allowed = set()
for db_name, db_actions in self.database_actions.items():
if action_in_list(datasette, action, db_actions):
allowed.add(db_name)
return allowed
def get_allowed_tables(
self, datasette: "Datasette", action: str
) -> Set[Tuple[str, str]]:
"""Get (database, table) pairs where this action is allowed."""
allowed = set()
for db_name, tables in self.table_actions.items():
for table_name, table_actions in tables.items():
if action_in_list(datasette, action, table_actions):
allowed.add((db_name, table_name))
return allowed
@hookimpl(specname="permission_resources_sql")
async def actor_restrictions_sql(
datasette: "Datasette",
actor: Optional[dict],
action: str,
) -> Optional[List[PermissionSQL]]:
"""
Handle actor restriction-based permission rules.
When an actor has an "_r" key, it contains an allowlist of resources they
can access. This function returns restriction_sql that filters the final
results to only include resources in that allowlist.
The _r structure:
{
"a": ["vi", "pd"], # Global actions allowed
"d": {"mydb": ["vt", "es"]}, # Database-level actions
"r": {"mydb": {"users": ["vt"]}} # Table-level actions
}
"""
if not actor:
return None
restrictions = ActorRestrictions.from_actor(actor)
if restrictions is None:
# No restrictions - all resources allowed
return []
# If globally allowed, no filtering needed
if restrictions.is_action_globally_allowed(datasette, action):
return []
# Build restriction SQL
allowed_dbs = restrictions.get_allowed_databases(datasette, action)
allowed_tables = restrictions.get_allowed_tables(datasette, action)
# If nothing is allowed for this action, return empty-set restriction
if not allowed_dbs and not allowed_tables:
return [
PermissionSQL(
params={"deny": f"actor restrictions: {action} not in allowlist"},
restriction_sql="SELECT NULL AS parent, NULL AS child WHERE 0",
)
]
# Build UNION of allowed resources
selects = []
params = {}
counter = 0
# Database-level entries (parent, NULL) - allows all children
for db_name in allowed_dbs:
key = f"restr_{counter}"
counter += 1
selects.append(f"SELECT :{key}_parent AS parent, NULL AS child")
params[f"{key}_parent"] = db_name
# Table-level entries (parent, child)
for db_name, table_name in allowed_tables:
key = f"restr_{counter}"
counter += 1
selects.append(f"SELECT :{key}_parent AS parent, :{key}_child AS child")
params[f"{key}_parent"] = db_name
params[f"{key}_child"] = table_name
restriction_sql = "\nUNION ALL\n".join(selects)
return [PermissionSQL(params=params, restriction_sql=restriction_sql)]
def restrictions_allow_action(
datasette: "Datasette",
restrictions: dict,
action: str,
resource: Optional[str | Tuple[str, str]],
) -> bool:
"""
Check if restrictions allow the requested action on the requested resource.
This is a synchronous utility function for use by other code that needs
to quickly check restriction allowlists.
Args:
datasette: The Datasette instance
restrictions: The _r dict from an actor
action: The action name to check
resource: None for global, str for database, (db, table) tuple for table
Returns:
True if allowed, False if denied
"""
# Does this action have an abbreviation?
to_check = get_action_name_variants(datasette, action)
# Check global level (any resource)
all_allowed = restrictions.get("a")
if all_allowed is not None:
assert isinstance(all_allowed, list)
if to_check.intersection(all_allowed):
return True
# Check database level
if resource:
if isinstance(resource, str):
database_name = resource
else:
database_name = resource[0]
database_allowed = restrictions.get("d", {}).get(database_name)
if database_allowed is not None:
assert isinstance(database_allowed, list)
if to_check.intersection(database_allowed):
return True
# Check table/resource level
if resource is not None and not isinstance(resource, str) and len(resource) == 2:
database, table = resource
table_allowed = restrictions.get("r", {}).get(database, {}).get(table)
if table_allowed is not None:
assert isinstance(table_allowed, list)
if to_check.intersection(table_allowed):
return True
# This action is not explicitly allowed, so reject it
return False

View file

@ -1,29 +0,0 @@
"""
Root user permission handling for Datasette.
Grants full permissions to the root user when --root flag is used.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from datasette.app import Datasette
from datasette import hookimpl
from datasette.permissions import PermissionSQL
@hookimpl(specname="permission_resources_sql")
async def root_user_permissions_sql(
datasette: "Datasette",
actor: Optional[dict],
) -> Optional[PermissionSQL]:
"""
Grant root user full permissions when --root flag is used.
"""
if not datasette.root_enabled:
return None
if actor is not None and actor.get("id") == "root":
return PermissionSQL.allow(reason="root user")

View file

@ -1,95 +0,0 @@
"""
Token authentication for Datasette.
Handles signed API tokens (dstok_ prefix).
"""
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from datasette.app import Datasette
import itsdangerous
from datasette import hookimpl
@hookimpl(specname="actor_from_request")
def actor_from_signed_api_token(datasette: "Datasette", request) -> Optional[dict]:
"""
Authenticate requests using signed API tokens (dstok_ prefix).
Token structure (signed JSON):
{
"a": "actor_id", # Actor ID
"t": 1234567890, # Timestamp (Unix epoch)
"d": 3600, # Optional: Duration in seconds
"_r": {...} # Optional: Restrictions
}
"""
prefix = "dstok_"
# Check if tokens are enabled
if not datasette.setting("allow_signed_tokens"):
return None
max_signed_tokens_ttl = datasette.setting("max_signed_tokens_ttl")
# Get authorization header
authorization = request.headers.get("authorization")
if not authorization:
return None
if not authorization.startswith("Bearer "):
return None
token = authorization[len("Bearer ") :]
if not token.startswith(prefix):
return None
# Remove prefix and verify signature
token = token[len(prefix) :]
try:
decoded = datasette.unsign(token, namespace="token")
except itsdangerous.BadSignature:
return None
# Validate timestamp
if "t" not in decoded:
return None
created = decoded["t"]
if not isinstance(created, int):
return None
# Handle duration/expiry
duration = decoded.get("d")
if duration is not None and not isinstance(duration, int):
return None
# Apply max TTL if configured
if (duration is None and max_signed_tokens_ttl) or (
duration is not None
and max_signed_tokens_ttl
and duration > max_signed_tokens_ttl
):
duration = max_signed_tokens_ttl
# Check expiry
if duration:
if time.time() - created > duration:
return None
# Build actor dict
actor = {"id": decoded["a"], "token": "dstok"}
# Copy restrictions if present
if "_r" in decoded:
actor["_r"] = decoded["_r"]
# Add expiry timestamp if applicable
if duration:
actor["token_expires"] = created + duration
return actor

View file

@ -1,33 +1,6 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, NamedTuple
import contextvars
# Context variable to track when permission checks should be skipped
_skip_permission_checks = contextvars.ContextVar(
"skip_permission_checks", default=False
)
class SkipPermissions:
"""Context manager to temporarily skip permission checks.
This is not a stable API and may change in future releases.
Usage:
with SkipPermissions():
# Permission checks are skipped within this block
response = await datasette.client.get("/protected")
"""
def __enter__(self):
self.token = _skip_permission_checks.set(True)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
_skip_permission_checks.reset(self.token)
return False
class Resource(ABC):

View file

@ -94,24 +94,21 @@ def get_plugins():
for plugin in pm.get_plugins():
static_path = None
templates_path = None
plugin_name = (
plugin.__name__
if hasattr(plugin, "__name__")
else plugin.__class__.__name__
)
if plugin_name not in DEFAULT_PLUGINS:
if plugin.__name__ not in DEFAULT_PLUGINS:
try:
if (importlib_resources.files(plugin_name) / "static").is_dir():
static_path = str(importlib_resources.files(plugin_name) / "static")
if (importlib_resources.files(plugin_name) / "templates").is_dir():
if (importlib_resources.files(plugin.__name__) / "static").is_dir():
static_path = str(
importlib_resources.files(plugin.__name__) / "static"
)
if (importlib_resources.files(plugin.__name__) / "templates").is_dir():
templates_path = str(
importlib_resources.files(plugin_name) / "templates"
importlib_resources.files(plugin.__name__) / "templates"
)
except (TypeError, ModuleNotFoundError):
# Caused by --plugins_dir= plugins
pass
plugin_info = {
"name": plugin_name,
"name": plugin.__name__,
"static_path": static_path,
"templates_path": templates_path,
"hooks": [h.name for h in pm.get_hookcallers(plugin)],

View file

@ -3,7 +3,7 @@ import click
import json
import os
import re
from subprocess import CalledProcessError, check_call, check_output
from subprocess import check_call, check_output
from .common import (
add_common_publish_arguments_and_options,
@ -23,9 +23,7 @@ def publish_subcommand(publish):
help="Application name to use when building",
)
@click.option(
"--service",
default="",
help="Cloud Run service to deploy (or over-write)",
"--service", default="", help="Cloud Run service to deploy (or over-write)"
)
@click.option("--spatialite", is_flag=True, help="Enable SpatialLite extension")
@click.option(
@ -57,32 +55,13 @@ def publish_subcommand(publish):
@click.option(
"--max-instances",
type=int,
default=1,
show_default=True,
help="Maximum Cloud Run instances (use 0 to remove the limit)",
help="Maximum Cloud Run instances",
)
@click.option(
"--min-instances",
type=int,
help="Minimum Cloud Run instances",
)
@click.option(
"--artifact-repository",
default="datasette",
show_default=True,
help="Artifact Registry repository to store the image",
)
@click.option(
"--artifact-region",
default="us",
show_default=True,
help="Artifact Registry location (region or multi-region)",
)
@click.option(
"--artifact-project",
default=None,
help="Project ID for Artifact Registry (defaults to the active project)",
)
def cloudrun(
files,
metadata,
@ -112,9 +91,6 @@ def publish_subcommand(publish):
apt_get_extras,
max_instances,
min_instances,
artifact_repository,
artifact_region,
artifact_project,
):
"Publish databases to Datasette running on Cloud Run"
fail_if_publish_binary_not_installed(
@ -124,21 +100,6 @@ def publish_subcommand(publish):
"gcloud config get-value project", shell=True, universal_newlines=True
).strip()
artifact_project = artifact_project or project
# Ensure Artifact Registry exists for the target image
_ensure_artifact_registry(
artifact_project=artifact_project,
artifact_region=artifact_region,
artifact_repository=artifact_repository,
)
artifact_host = (
artifact_region
if artifact_region.endswith("-docker.pkg.dev")
else f"{artifact_region}-docker.pkg.dev"
)
if not service:
# Show the user their current services, then prompt for one
click.echo("Please provide a service name for this deployment\n")
@ -156,11 +117,6 @@ def publish_subcommand(publish):
click.echo("")
service = click.prompt("Service name", type=str)
image_id = (
f"{artifact_host}/{artifact_project}/"
f"{artifact_repository}/datasette-{service}"
)
extra_metadata = {
"title": title,
"license": license,
@ -217,6 +173,7 @@ def publish_subcommand(publish):
print(fp.read())
print("\n====================\n")
image_id = f"gcr.io/{project}/datasette-{service}"
check_call(
"gcloud builds submit --tag {}{}".format(
image_id, " --timeout {}".format(timeout) if timeout else ""
@ -230,7 +187,7 @@ def publish_subcommand(publish):
("--max-instances", max_instances),
("--min-instances", min_instances),
):
if value is not None:
if value:
extra_deploy_options.append("{} {}".format(option, value))
check_call(
"gcloud run deploy --allow-unauthenticated --platform=managed --image {} {}{}".format(
@ -242,52 +199,6 @@ def publish_subcommand(publish):
)
def _ensure_artifact_registry(artifact_project, artifact_region, artifact_repository):
"""Ensure Artifact Registry API is enabled and the repository exists."""
enable_cmd = (
"gcloud services enable artifactregistry.googleapis.com "
f"--project {artifact_project} --quiet"
)
try:
check_call(enable_cmd, shell=True)
except CalledProcessError as exc:
raise click.ClickException(
"Failed to enable artifactregistry.googleapis.com. "
"Please ensure you have permissions to manage services."
) from exc
describe_cmd = (
"gcloud artifacts repositories describe {repo} --project {project} "
"--location {location} --quiet"
).format(
repo=artifact_repository,
project=artifact_project,
location=artifact_region,
)
try:
check_call(describe_cmd, shell=True)
return
except CalledProcessError:
create_cmd = (
"gcloud artifacts repositories create {repo} --repository-format=docker "
'--location {location} --project {project} --description "Datasette Cloud Run images" --quiet'
).format(
repo=artifact_repository,
location=artifact_region,
project=artifact_project,
)
try:
check_call(create_cmd, shell=True)
click.echo(f"Created Artifact Registry repository '{artifact_repository}'")
except CalledProcessError as exc:
raise click.ClickException(
"Failed to create Artifact Registry repository. "
"Use --artifact-repository/--artifact-region to point to an existing repo "
"or create one manually."
) from exc
def get_existing_services():
services = json.loads(
check_output(
@ -303,7 +214,6 @@ def get_existing_services():
"url": service["status"]["address"]["url"],
}
for service in services
if "url" in service["status"]
]

View file

@ -56,7 +56,7 @@
{% endif %}
{% if tables %}
<h2 id="tables">Tables <a style="font-weight: normal; font-size: 0.75em; padding-left: 0.5em;" href="{{ urls.database(database) }}/-/schema">schema</a></h2>
<h2 id="tables">Tables</h2>
{% endif %}
{% for table in tables %}

View file

@ -31,7 +31,7 @@
<td><strong>{{ action.name }}</strong></td>
<td>{% if action.abbr %}<code>{{ action.abbr }}</code>{% endif %}</td>
<td>{{ action.description or "" }}</td>
<td>{% if action.resource_class %}<code>{{ action.resource_class }}</code>{% endif %}</td>
<td><code>{{ action.resource_class }}</code></td>
<td>{% if action.takes_parent %}✓{% endif %}</td>
<td>{% if action.takes_child %}✓{% endif %}</td>
<td>{% if action.also_requires %}<code>{{ action.also_requires }}</code>{% endif %}</td>

View file

@ -1,41 +0,0 @@
{% extends "base.html" %}
{% block title %}{% if is_instance %}Schema for all databases{% elif table_name %}Schema for {{ schemas[0].database }}.{{ table_name }}{% else %}Schema for {{ schemas[0].database }}{% endif %}{% endblock %}
{% block body_class %}schema{% endblock %}
{% block crumbs %}
{% if is_instance %}
{{ crumbs.nav(request=request) }}
{% elif table_name %}
{{ crumbs.nav(request=request, database=schemas[0].database, table=table_name) }}
{% else %}
{{ crumbs.nav(request=request, database=schemas[0].database) }}
{% endif %}
{% endblock %}
{% block content %}
<div class="page-header">
<h1>{% if is_instance %}Schema for all databases{% elif table_name %}Schema for {{ table_name }}{% else %}Schema for {{ schemas[0].database }}{% endif %}</h1>
</div>
{% for item in schemas %}
{% if is_instance %}
<h2>{{ item.database }}</h2>
{% endif %}
{% if item.schema %}
<pre style="background-color: #f5f5f5; padding: 1em; overflow-x: auto; border: 1px solid #ddd; border-radius: 4px;"><code>{{ item.schema }}</code></pre>
{% else %}
<p><em>No schema available for this database.</em></p>
{% endif %}
{% if not loop.last %}
<hr style="margin: 2em 0;">
{% endif %}
{% endfor %}
{% if not schemas %}
<p><em>No databases with viewable schemas found.</em></p>
{% endif %}
{% endblock %}

View file

@ -155,16 +155,6 @@ async def _build_single_action_sql(
action=action,
)
# If permission_sqls is the sentinel, skip all permission checks
# Return SQL that allows all resources
from datasette.utils.permissions import SKIP_PERMISSION_CHECKS
if permission_sqls is SKIP_PERMISSION_CHECKS:
cols = "parent, child, 'skip_permission_checks' AS reason"
if include_is_private:
cols += ", 0 AS is_private"
return f"SELECT {cols} FROM ({base_resources_sql})", {}
all_params = {}
rule_sqls = []
restriction_sqls = []
@ -446,17 +436,6 @@ async def build_permission_rules_sql(
action=action,
)
# If permission_sqls is the sentinel, skip all permission checks
# Return SQL that allows everything
from datasette.utils.permissions import SKIP_PERMISSION_CHECKS
if permission_sqls is SKIP_PERMISSION_CHECKS:
return (
"SELECT NULL AS parent, NULL AS child, 1 AS allow, 'skip_permission_checks' AS reason, 'skip' AS source_plugin",
{},
[],
)
if not permission_sqls:
return (
"SELECT NULL AS parent, NULL AS child, 0 AS allow, NULL AS reason, NULL AS source_plugin WHERE 0",

View file

@ -6,7 +6,6 @@ from pathlib import Path
from http.cookies import SimpleCookie, Morsel
import aiofiles
import aiofiles.os
import re
# Workaround for adding samesite support to pre 3.8 python
Morsel._reserved["samesite"] = "SameSite"
@ -249,9 +248,6 @@ async def asgi_send_html(send, html, status=200, headers=None):
async def asgi_send_redirect(send, location, status=302):
# Prevent open redirect vulnerability: strip multiple leading slashes
# //example.com would be interpreted as a protocol-relative URL (e.g., https://example.com/)
location = re.sub(r"^/+", "/", location)
await asgi_send(
send,
"",

View file

@ -10,26 +10,13 @@ from datasette.plugins import pm
from datasette.utils import await_me_maybe
# Sentinel object to indicate permission checks should be skipped
SKIP_PERMISSION_CHECKS = object()
async def gather_permission_sql_from_hooks(
*, datasette, actor: dict | None, action: str
) -> List[PermissionSQL] | object:
) -> List[PermissionSQL]:
"""Collect PermissionSQL objects from the permission_resources_sql hook.
Ensures that each returned PermissionSQL has a populated ``source``.
Returns SKIP_PERMISSION_CHECKS sentinel if skip_permission_checks context variable
is set, signaling that all permission checks should be bypassed.
"""
from datasette.permissions import _skip_permission_checks
# Check if we should skip permission checks BEFORE calling hooks
# This avoids creating unawaited coroutines
if _skip_permission_checks.get():
return SKIP_PERMISSION_CHECKS
hook_caller = pm.hook.permission_resources_sql
hookimpls = hook_caller.get_hookimpls()

View file

@ -1,2 +1,2 @@
__version__ = "1.0a23"
__version__ = "1.0a19"
__version_info__ = tuple(__version__.split("."))

View file

@ -761,6 +761,8 @@ class ApiExplorerView(BaseView):
async def example_links(self, request):
databases = []
for name, db in self.ds.databases.items():
if name == "_internal":
continue
database_visible, _ = await self.ds.check_visibility(
request.actor,
action="view-database",
@ -979,180 +981,3 @@ class TablesView(BaseView):
]
return Response.json({"matches": matches, "truncated": truncated})
class SchemaBaseView(BaseView):
"""Base class for schema views with common response formatting."""
has_json_alternate = False
async def get_database_schema(self, database_name):
"""Get schema SQL for a database."""
db = self.ds.databases[database_name]
result = await db.execute(
"select group_concat(sql, ';' || CHAR(10)) as schema from sqlite_master where sql is not null"
)
row = result.first()
return row["schema"] if row and row["schema"] else ""
def format_json_response(self, data):
"""Format data as JSON response with CORS headers if needed."""
headers = {}
if self.ds.cors:
add_cors_headers(headers)
return Response.json(data, headers=headers)
def format_error_response(self, error_message, format_, status=404):
"""Format error response based on requested format."""
if format_ == "json":
headers = {}
if self.ds.cors:
add_cors_headers(headers)
return Response.json(
{"ok": False, "error": error_message}, status=status, headers=headers
)
else:
return Response.text(error_message, status=status)
def format_markdown_response(self, heading, schema):
"""Format schema as Markdown response."""
md_output = f"# {heading}\n\n```sql\n{schema}\n```\n"
return Response.text(
md_output, headers={"content-type": "text/markdown; charset=utf-8"}
)
async def format_html_response(
self, request, schemas, is_instance=False, table_name=None
):
"""Format schema as HTML response."""
context = {
"schemas": schemas,
"is_instance": is_instance,
}
if table_name:
context["table_name"] = table_name
return await self.render(["schema.html"], request=request, context=context)
class InstanceSchemaView(SchemaBaseView):
"""
Displays schema for all databases in the instance.
Supports HTML, JSON, and Markdown formats.
"""
name = "instance_schema"
async def get(self, request):
format_ = request.url_vars.get("format") or "html"
# Get all databases the actor can view
allowed_databases_page = await self.ds.allowed_resources(
"view-database",
request.actor,
)
allowed_databases = [r.parent async for r in allowed_databases_page.all()]
# Get schema for each database
schemas = []
for database_name in allowed_databases:
schema = await self.get_database_schema(database_name)
schemas.append({"database": database_name, "schema": schema})
if format_ == "json":
return self.format_json_response({"schemas": schemas})
elif format_ == "md":
md_parts = [
f"# Schema for {item['database']}\n\n```sql\n{item['schema']}\n```"
for item in schemas
]
return Response.text(
"\n\n".join(md_parts),
headers={"content-type": "text/markdown; charset=utf-8"},
)
else:
return await self.format_html_response(request, schemas, is_instance=True)
class DatabaseSchemaView(SchemaBaseView):
"""
Displays schema for a specific database.
Supports HTML, JSON, and Markdown formats.
"""
name = "database_schema"
async def get(self, request):
database_name = request.url_vars["database"]
format_ = request.url_vars.get("format") or "html"
# Check if database exists
if database_name not in self.ds.databases:
return self.format_error_response("Database not found", format_)
# Check view-database permission
await self.ds.ensure_permission(
action="view-database",
resource=DatabaseResource(database=database_name),
actor=request.actor,
)
schema = await self.get_database_schema(database_name)
if format_ == "json":
return self.format_json_response(
{"database": database_name, "schema": schema}
)
elif format_ == "md":
return self.format_markdown_response(f"Schema for {database_name}", schema)
else:
schemas = [{"database": database_name, "schema": schema}]
return await self.format_html_response(request, schemas)
class TableSchemaView(SchemaBaseView):
"""
Displays schema for a specific table.
Supports HTML, JSON, and Markdown formats.
"""
name = "table_schema"
async def get(self, request):
database_name = request.url_vars["database"]
table_name = request.url_vars["table"]
format_ = request.url_vars.get("format") or "html"
# Check view-table permission
await self.ds.ensure_permission(
action="view-table",
resource=TableResource(database=database_name, table=table_name),
actor=request.actor,
)
# Get schema for the table
db = self.ds.databases[database_name]
result = await db.execute(
"select sql from sqlite_master where name = ? and sql is not null",
[table_name],
)
row = result.first()
# Return 404 if table doesn't exist
if not row or not row["sql"]:
return self.format_error_response("Table not found", format_)
schema = row["sql"]
if format_ == "json":
return self.format_json_response(
{"database": database_name, "table": table_name, "schema": schema}
)
elif format_ == "md":
return self.format_markdown_response(
f"Schema for {database_name}.{table_name}", schema
)
else:
schemas = [{"database": database_name, "schema": schema}]
return await self.format_html_response(
request, schemas, table_name=table_name
)

View file

@ -83,39 +83,6 @@ Datasette's built-in view actions (``view-database``, ``view-table`` etc) are al
Other actions, including those introduced by plugins, will default to *deny*.
.. _authentication_default_deny:
Denying all permissions by default
----------------------------------
By default, Datasette allows unauthenticated access to view databases, tables, and execute SQL queries.
You may want to run Datasette in a mode where **all** access is denied by default, and you explicitly grant permissions only to authenticated users, either using the :ref:`--root mechanism <authentication_root>` or through :ref:`configuration file rules <authentication_permissions_config>` or plugins.
Use the ``--default-deny`` command-line option to run Datasette in this mode::
datasette --default-deny data.db --root
With ``--default-deny`` enabled:
* Anonymous users are denied access to view the instance, databases, tables, and queries
* Authenticated users are also denied access unless they're explicitly granted permissions
* The root user (when using ``--root``) still has access to everything
* You can grant permissions using :ref:`configuration file rules <authentication_permissions_config>` or plugins
For example, to allow only a specific user to access your instance::
datasette --default-deny data.db --config datasette.yaml
Where ``datasette.yaml`` contains:
.. code-block:: yaml
allow:
id: alice
This configuration will deny access to everyone except the user with ``id`` of ``alice``.
.. _authentication_permissions_explained:
How permissions are resolved

View file

@ -4,43 +4,6 @@
Changelog
=========
.. _v1_0_a23:
1.0a23 (2025-12-02)
-------------------
- Fix for bug where a stale database entry in ``internal.db`` could cause a 500 error on the homepage. (:issue:`2605`)
- Cosmetic improvement to ``/-/actions`` page. (:issue:`2599`)
.. _v1_0_a22:
1.0a22 (2025-11-13)
-------------------
- ``datasette serve --default-deny`` option for running Datasette configured to :ref:`deny all permissions by default <authentication_default_deny>`. (:issue:`2592`)
- ``datasette.is_client()`` method for detecting if code is :ref:`executing inside a datasette.client request <internals_datasette_is_client>`. (:issue:`2594`)
- ``datasette.pm`` property can now be used to :ref:`register and unregister plugins in tests <testing_plugins_register_in_test>`. (:issue:`2595`)
.. _v1_0_a21:
1.0a21 (2025-11-05)
-------------------
- Fixes an **open redirect** security issue: Datasette instances would redirect to ``example.com/foo/bar`` if you accessed the path ``//example.com/foo/bar``. Thanks to `James Jefferies <https://github.com/jamesjefferies>`__ for the fix. (:issue:`2429`)
- Fixed ``datasette publish cloudrun`` to work with changes to the underlying Cloud Run architecture. (:issue:`2511`)
- New ``datasette --get /path --headers`` option for inspecting the headers returned by a path. (:issue:`2578`)
- New ``datasette.client.get(..., skip_permission_checks=True)`` parameter to bypass permission checks when making requests using the internal client. (:issue:`2583`)
.. _v0_65_2:
0.65.2 (2025-11-05)
-------------------
- Fixes an **open redirect** security issue: Datasette instances would redirect to ``example.com/foo/bar`` if you accessed the path ``//example.com/foo/bar``. Thanks to `James Jefferies <https://github.com/jamesjefferies>`__ for the fix. (:issue:`2429`)
- Upgraded for compatibility with Python 3.14.
- Fixed ``datasette publish cloudrun`` to work with changes to the underlying Cloud Run architecture. (:issue:`2511`)
- Minor upgrades to fix warnings, including ``pkg_resources`` deprecation.
.. _v1_0_a20:
1.0a20 (2025-11-03)
@ -79,16 +42,22 @@ Related changes:
- Permission debugging improvements:
- The ``/-/allowed`` endpoint shows resources the user is allowed to interact with for different actions.
- ``/-/rules`` shows the raw allow/deny rules that apply to different permission checks.
- ``/-/actions`` lists every available action.
- ``/-/check`` can be used to try out different permission checks for the current actor.
Other changes
~~~~~~~~~~~~~
- The internal ``catalog_views`` table now tracks SQLite views alongside tables in the introspection database. (:issue:`2495`)
- Hitting the ``/`` brings up a search interface for navigating to tables that the current user can view. A new ``/-/tables`` endpoint supports this functionality. (:issue:`2523`)
- Datasette attempts to detect some configuration errors on startup.
- Datasette now supports Python 3.14 and no longer tests against Python 3.9.
.. _v1_0_a19:
@ -295,7 +264,7 @@ To avoid similar mistakes in the future the ``datasette.permission_allowed()`` m
Permission checks now consider opinions from every plugin
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The ``datasette.permission_allowed()`` method previously consulted every plugin that implemented the ``permission_allowed()`` plugin hook and obeyed the opinion of the last plugin to return a value. (:issue:`2275`)
The ``datasette.permission_allowed()`` method previously consulted every plugin that implemented the :ref:`permission_allowed() <plugin_hook_permission_allowed>` plugin hook and obeyed the opinion of the last plugin to return a value. (:issue:`2275`)
Datasette now consults every plugin and checks to see if any of them returned ``False`` (the veto rule), and if none of them did, it then checks to see if any of them returned ``True``.
@ -1414,7 +1383,7 @@ You can use the new ``"allow"`` block syntax in ``metadata.json`` (or ``metadata
See :ref:`authentication_permissions_allow` for more details.
Plugins can implement their own custom permission checks using the new ``plugin_hook_permission_allowed()`` plugin hook.
Plugins can implement their own custom permission checks using the new :ref:`plugin_hook_permission_allowed` hook.
A new debug page at ``/-/permissions`` shows recent permission checks, to help administrators and plugin authors understand exactly what checks are being performed. This tool defaults to only being available to the root user, but can be exposed to other users by plugins that respond to the ``permissions-debug`` permission. (:issue:`788`)

View file

@ -119,10 +119,8 @@ Once started you can access it at ``http://localhost:8001``
signed cookies
--root Output URL that sets a cookie authenticating
the root user
--default-deny Deny all permissions by default
--get TEXT Run an HTTP GET request against this path,
print results and exit
--headers Include HTTP headers in --get output
--token TEXT API token to send with --get requests
--actor TEXT Actor to use for --get requests (JSON string)
--version-note TEXT Additional note to show on /-/versions
@ -490,15 +488,8 @@ See :ref:`publish_cloud_run`.
--cpu [1|2|4] Number of vCPUs to allocate in Cloud Run
--timeout INTEGER Build timeout in seconds
--apt-get-install TEXT Additional packages to apt-get install
--max-instances INTEGER Maximum Cloud Run instances (use 0 to remove
the limit) [default: 1]
--max-instances INTEGER Maximum Cloud Run instances
--min-instances INTEGER Minimum Cloud Run instances
--artifact-repository TEXT Artifact Registry repository to store the
image [default: datasette]
--artifact-region TEXT Artifact Registry location (region or multi-
region) [default: us]
--artifact-project TEXT Project ID for Artifact Registry (defaults to
the active project)
--help Show this message and exit.

View file

@ -20,7 +20,7 @@ General guidelines
Setting up a development environment
------------------------------------
If you have Python 3.10 or higher installed on your computer (on OS X the quickest way to do this `is using homebrew <https://docs.python-guide.org/starting/install3/osx/>`__) you can install an editable copy of Datasette using the following steps.
If you have Python 3.8 or higher installed on your computer (on OS X the quickest way to do this `is using homebrew <https://docs.python-guide.org/starting/install3/osx/>`__) you can install an editable copy of Datasette using the following steps.
If you want to use GitHub to publish your changes, first `create a fork of datasette <https://github.com/simonw/datasette/fork>`__ under your own GitHub account.

View file

@ -781,8 +781,8 @@ Use ``is_mutable=False`` to add an immutable database.
.. _datasette_add_memory_database:
.add_memory_database(memory_name, name=None, route=None)
--------------------------------------------------------
.add_memory_database(name)
--------------------------
Adds a shared in-memory database with the specified name:
@ -800,9 +800,7 @@ This is a shortcut for the following:
Database(datasette, memory_name="statistics")
)
Using either of these patterns will result in the in-memory database being served at ``/statistics``.
The ``name`` and ``route`` parameters are optional and work the same way as they do for :ref:`datasette_add_database`.
Using either of these pattern will result in the in-memory database being served at ``/statistics``.
.. _datasette_remove_database:
@ -1047,60 +1045,6 @@ These methods can be used with :ref:`internals_datasette_urls` - for example:
For documentation on available ``**kwargs`` options and the shape of the HTTPX Response object refer to the `HTTPX Async documentation <https://www.python-httpx.org/async/>`__.
Bypassing permission checks
~~~~~~~~~~~~~~~~~~~~~~~~~~~
All ``datasette.client`` methods accept an optional ``skip_permission_checks=True`` parameter. When set, all permission checks will be bypassed for that request, allowing access to any resource regardless of the configured permissions.
This is useful for plugins and internal operations that need to access all resources without being subject to permission restrictions.
Example usage:
.. code-block:: python
# Regular request - respects permissions
response = await datasette.client.get(
"/private-db/secret-table.json"
)
# May return 403 Forbidden if access is denied
# With skip_permission_checks - bypasses all permission checks
response = await datasette.client.get(
"/private-db/secret-table.json",
skip_permission_checks=True,
)
# Will return 200 OK and the data, regardless of permissions
This parameter works with all HTTP methods (``get``, ``post``, ``put``, ``patch``, ``delete``, ``options``, ``head``) and the generic ``request`` method.
.. warning::
Use ``skip_permission_checks=True`` with caution. It completely bypasses Datasette's permission system and should only be used in trusted plugin code or internal operations where you need guaranteed access to resources.
.. _internals_datasette_is_client:
Detecting internal client requests
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
``datasette.in_client()`` - returns bool
Returns ``True`` if the current code is executing within a ``datasette.client`` request, ``False`` otherwise.
This method is useful for plugins that need to behave differently when called through ``datasette.client`` versus when handling external HTTP requests.
Example usage:
.. code-block:: python
async def fetch_documents(datasette):
if not datasette.in_client():
return Response.text(
"Only available via internal client requests",
status=403,
)
...
Note that ``datasette.in_client()`` is independent of ``skip_permission_checks``. A request made through ``datasette.client`` will always have ``in_client()`` return ``True``, regardless of whether ``skip_permission_checks`` is set.
.. _internals_datasette_urls:
datasette.urls

View file

@ -107,46 +107,3 @@ Note that this URL includes the encoded primary key of the record.
Here's that same page as JSON:
`../people/uk~2Eorg~2Epublicwhip~2Fperson~2F10001.json <https://register-of-members-interests.datasettes.com/regmem/people/uk~2Eorg~2Epublicwhip~2Fperson~2F10001.json>`_
.. _pages_schemas:
Schemas
=======
Datasette offers ``/-/schema`` endpoints to expose the SQL schema for databases and tables.
.. _InstanceSchemaView:
Instance schema
---------------
Access ``/-/schema`` to see the complete schema for all attached databases in the Datasette instance.
Use ``/-/schema.md`` to get the same information as Markdown.
Use ``/-/schema.json`` to get the same information as JSON, which looks like this:
.. code-block:: json
{
"schemas": [
{
"database": "content",
"schema": "create table posts ..."
}
}
.. _DatabaseSchemaView:
Database schema
---------------
Use ``/database-name/-/schema`` to see the complete schema for a specific database. The ``.md`` and ``.json`` extensions work here too. The JSON returns an object with ``"database"`` and ``"schema"`` keys.
.. _TableSchemaView:
Table schema
------------
Use ``/database-name/table-name/-/schema`` to see the schema for a specific table. The ``.md`` and ``.json`` extensions work here too. The JSON returns an object with ``"database"``, ``"table"``, and ``"schema"`` keys.

View file

@ -1314,6 +1314,72 @@ This example plugin causes 0 results to be returned if ``?_nothing=1`` is added
Example: `datasette-leaflet-freedraw <https://datasette.io/plugins/datasette-leaflet-freedraw>`_
.. _plugin_hook_permission_allowed:
permission_allowed(datasette, actor, action, resource)
------------------------------------------------------
``datasette`` - :ref:`internals_datasette`
You can use this to access plugin configuration options via ``datasette.plugin_config(your_plugin_name)``, or to execute SQL queries.
``actor`` - dictionary
The current actor, as decided by :ref:`plugin_hook_actor_from_request`.
``action`` - string
The action to be performed, e.g. ``"edit-table"``.
``resource`` - string or None
An identifier for the individual resource, e.g. the name of the table.
Called to check that an actor has permission to perform an action on a resource. Can return ``True`` if the action is allowed, ``False`` if the action is not allowed or ``None`` if the plugin does not have an opinion one way or the other.
Here's an example plugin which randomly selects if a permission should be allowed or denied, except for ``view-instance`` which always uses the default permission scheme instead.
.. code-block:: python
from datasette import hookimpl
import random
@hookimpl
def permission_allowed(action):
if action != "view-instance":
# Return True or False at random
return random.random() > 0.5
# Returning None falls back to default permissions
This function can alternatively return an awaitable function which itself returns ``True``, ``False`` or ``None``. You can use this option if you need to execute additional database queries using ``await datasette.execute(...)``.
Here's an example that allows users to view the ``admin_log`` table only if their actor ``id`` is present in the ``admin_users`` table. It aso disallows arbitrary SQL queries for the ``staff.db`` database for all users.
.. code-block:: python
@hookimpl
def permission_allowed(datasette, actor, action, resource):
async def inner():
if action == "execute-sql" and resource == "staff":
return False
if action == "view-table" and resource == (
"staff",
"admin_log",
):
if not actor:
return False
user_id = actor["id"]
result = await datasette.get_database(
"staff"
).execute(
"select count(*) from admin_users where user_id = :user_id",
{"user_id": user_id},
)
return result.first()[0] > 0
return inner
See :ref:`built-in permissions <authentication_permissions>` for a full list of permissions that are included in Datasette core.
Example: `datasette-permissions-sql <https://datasette.io/plugins/datasette-permissions-sql>`_
.. _plugin_hook_permission_resources_sql:
permission_resources_sql(datasette, actor, action)
@ -1915,16 +1981,16 @@ This example adds a new database action for creating a table, if the user has th
.. code-block:: python
from datasette import hookimpl
from datasette.resources import DatabaseResource
@hookimpl
def database_actions(datasette, actor, database):
async def inner():
if not await datasette.allowed(
if not await datasette.permission_allowed(
actor,
"edit-schema",
resource=DatabaseResource("database"),
resource=database,
default=False,
):
return []
return [

View file

@ -283,12 +283,13 @@ Here's a test for that plugin that mocks the HTTPX outbound request:
Registering a plugin for the duration of a test
-----------------------------------------------
When writing tests for plugins you may find it useful to register a test plugin just for the duration of a single test. You can do this using ``datasette.pm.register()`` and ``datasette.pm.unregister()`` like this:
When writing tests for plugins you may find it useful to register a test plugin just for the duration of a single test. You can do this using ``pm.register()`` and ``pm.unregister()`` like this:
.. code-block:: python
from datasette import hookimpl
from datasette.app import Datasette
from datasette.plugins import pm
import pytest
@ -304,14 +305,14 @@ When writing tests for plugins you may find it useful to register a test plugin
(r"^/error$", lambda: 1 / 0),
]
datasette = Datasette()
pm.register(TestPlugin(), name="undo")
try:
# The test implementation goes here
datasette.pm.register(TestPlugin(), name="undo")
datasette = Datasette()
response = await datasette.client.get("/error")
assert response.status_code == 500
finally:
datasette.pm.unregister(name="undo")
pm.unregister(name="undo")
To reuse the same temporary plugin in multiple tests, you can register it inside a fixture in your ``conftest.py`` file like this:

View file

@ -3,8 +3,11 @@ orphan: true
---
(upgrade_guide_v1_a20)=
# Datasette 1.0a20 plugin upgrade guide
<!-- START UPGRADE 1.0a20 -->
Datasette 1.0a20 makes some breaking changes to Datasette's permission system. Plugins need to be updated if they use **any of the following**:
- The `register_permissions()` plugin hook - this should be replaced with `register_actions`

View file

@ -4,7 +4,7 @@
(upgrade_guide_v1)=
## Datasette 0.X -> 1.0
This section reviews breaking changes Datasette ``1.0`` has when upgrading from a ``0.XX`` version. For new features that ``1.0`` offers, see the {ref}`changelog`.
This section reviews breaking changes Datasette ``1.0`` has when upgrading from a ``0.XX`` version. For new features that ``1.0`` offers, see the :ref:`changelog`.
(upgrade_guide_v1_sql_queries)=
### New URL for SQL queries
@ -37,7 +37,7 @@ Metadata was completely revamped for Datasette 1.0. There are a number of relate
(upgrade_guide_v1_metadata_split)=
#### ``metadata.yaml`` split into ``datasette.yaml``
Before Datasette 1.0, the ``metadata.yaml`` file became a kitchen sink if a mix of metadata, configuration, and settings. Now ``metadata.yaml`` is strictly for metadata (ex title and descriptions of database and tables, licensing info, etc). Other settings have been moved to a ``datasette.yml`` configuration file, described in {ref}`configuration`.
Before Datasette 1.0, the ``metadata.yaml`` file became a kitchen sink if a mix of metadata, configuration, and settings. Now ``metadata.yaml`` is strictly for metadata (ex title and descriptions of database and tables, licensing info, etc). Other settings have been moved to a ``datasette.yml`` configuration file, described in :ref:`configuration`.
To start Datasette with both metadata and configuration files, run it like this:
@ -85,14 +85,14 @@ def get_metadata(datasette, key, database, table):
pass
```
Instead, plugins are encouraged to interact directly with Datasette's in-memory metadata tables in SQLite using the following methods on the {ref}`internals_datasette`:
Instead, plugins are encouraged to interact directly with Datasette's in-memory metadata tables in SQLite using the following methods on the :ref:`internals_datasette`:
- {ref}`get_instance_metadata() <datasette_get_instance_metadata>` and {ref}`set_instance_metadata() <datasette_set_instance_metadata>`
- {ref}`get_database_metadata() <datasette_get_database_metadata>` and {ref}`set_database_metadata() <datasette_set_database_metadata>`
- {ref}`get_resource_metadata() <datasette_get_resource_metadata>` and {ref}`set_resource_metadata() <datasette_set_resource_metadata>`
- {ref}`get_column_metadata() <datasette_get_column_metadata>` and {ref}`set_column_metadata() <datasette_set_column_metadata>`
- :ref:`get_instance_metadata() <datasette_get_instance_metadata>` and :ref:`set_instance_metadata() <datasette_set_instance_metadata>`
- :ref:`get_database_metadata() <datasette_get_database_metadata>` and :ref:`set_database_metadata() <datasette_set_database_metadata>`
- :ref:`get_resource_metadata() <datasette_get_resource_metadata>` and :ref:`set_resource_metadata() <datasette_set_resource_metadata>`
- :ref:`get_column_metadata() <datasette_get_column_metadata>` and :ref:`set_column_metadata() <datasette_set_column_metadata>`
A plugin that stores or calculates its own metadata can implement the {ref}`plugin_hook_startup` hook to populate those items on startup, and then call those methods while it is running to persist any new metadata changes.
A plugin that stores or calculates its own metadata can implement the :ref:`plugin_hook_startup` hook to populate those items on startup, and then call those methods while it is running to persist any new metadata changes.
(upgrade_guide_v1_metadata_json_removed)=
#### The ``/metadata.json`` endpoint has been removed
@ -106,10 +106,10 @@ As of Datasette ``1.0a14``, the ``.metadata()`` method on the Datasette Python A
Instead, one should use the following methods on a Datasette class:
- {ref}`get_instance_metadata() <datasette_get_instance_metadata>`
- {ref}`get_database_metadata() <datasette_get_database_metadata>`
- {ref}`get_resource_metadata() <datasette_get_resource_metadata>`
- {ref}`get_column_metadata() <datasette_get_column_metadata>`
- :ref:`get_instance_metadata() <datasette_get_instance_metadata>`
- :ref:`get_database_metadata() <datasette_get_database_metadata>`
- :ref:`get_resource_metadata() <datasette_get_resource_metadata>`
- :ref:`get_column_metadata() <datasette_get_column_metadata>`
```{include} upgrade-1.0a20.md
:heading-offset: 1

View file

@ -28,7 +28,7 @@ dependencies = [
"click-default-group>=1.2.3",
"Jinja2>=2.10.3",
"hupper>=1.9",
"httpx>=0.20,<1.0",
"httpx>=0.20",
"pluggy>=1.0",
"uvicorn>=0.11",
"aiofiles>=0.4",
@ -69,11 +69,11 @@ docs = [
"ruamel.yaml",
]
test = [
"pytest>=9",
"pytest>=5.2.2",
"pytest-xdist>=2.2.1",
"pytest-asyncio>=1.2.0",
"beautifulsoup4>=4.8.1",
"black==25.11.0",
"black==25.9.0",
"blacken-docs==1.20.0",
"pytest-timeout>=1.4.2",
"trustme>=0.7",
@ -93,6 +93,3 @@ datasette = ["templates/*.html"]
[tool.setuptools.dynamic]
version = {attr = "datasette.version.__version__"}
[tool.uv]
package = true

View file

@ -469,7 +469,7 @@ def register_actions(datasette):
description="View a collection",
resource_class=DatabaseResource,
),
# Test actions for test_hook_custom_allowed (global actions - no resource_class)
# Test actions for test_hook_permission_allowed (global actions - no resource_class)
Action(
name="this_is_allowed",
abbr=None,
@ -553,7 +553,7 @@ def register_actions(datasette):
def permission_resources_sql(datasette, actor, action):
from datasette.permissions import PermissionSQL
# Handle test actions used in test_hook_custom_allowed
# Handle test actions used in test_hook_permission_allowed
if action == "this_is_allowed":
return PermissionSQL.allow(reason="test plugin allows this_is_allowed")
elif action == "this_is_denied":

View file

@ -11,6 +11,7 @@ These tests verify:
import pytest
import pytest_asyncio
from datasette.app import Datasette
from datasette.plugins import pm
from datasette.permissions import PermissionSQL
from datasette.resources import TableResource
from datasette import hookimpl
@ -66,7 +67,7 @@ async def test_allowed_resources_global_allow(test_ds):
return None
plugin = PermissionRulesPlugin(rules_callback)
test_ds.pm.register(plugin, name="test_plugin")
pm.register(plugin, name="test_plugin")
try:
# Use the new allowed_resources() method
@ -86,7 +87,7 @@ async def test_allowed_resources_global_allow(test_ds):
assert ("production", "orders") in table_set
finally:
test_ds.pm.unregister(plugin, name="test_plugin")
pm.unregister(plugin, name="test_plugin")
@pytest.mark.asyncio
@ -105,7 +106,7 @@ async def test_allowed_specific_resource(test_ds):
return None
plugin = PermissionRulesPlugin(rules_callback)
test_ds.pm.register(plugin, name="test_plugin")
pm.register(plugin, name="test_plugin")
try:
actor = {"id": "bob", "role": "analyst"}
@ -129,7 +130,7 @@ async def test_allowed_specific_resource(test_ds):
)
finally:
test_ds.pm.unregister(plugin, name="test_plugin")
pm.unregister(plugin, name="test_plugin")
@pytest.mark.asyncio
@ -147,7 +148,7 @@ async def test_allowed_resources_include_reasons(test_ds):
return None
plugin = PermissionRulesPlugin(rules_callback)
test_ds.pm.register(plugin, name="test_plugin")
pm.register(plugin, name="test_plugin")
try:
# Use allowed_resources with include_reasons to get debugging info
@ -169,7 +170,7 @@ async def test_allowed_resources_include_reasons(test_ds):
assert "analyst access" in reasons_text
finally:
test_ds.pm.unregister(plugin, name="test_plugin")
pm.unregister(plugin, name="test_plugin")
@pytest.mark.asyncio
@ -189,7 +190,7 @@ async def test_child_deny_overrides_parent_allow(test_ds):
return None
plugin = PermissionRulesPlugin(rules_callback)
test_ds.pm.register(plugin, name="test_plugin")
pm.register(plugin, name="test_plugin")
try:
actor = {"id": "bob", "role": "analyst"}
@ -218,7 +219,7 @@ async def test_child_deny_overrides_parent_allow(test_ds):
)
finally:
test_ds.pm.unregister(plugin, name="test_plugin")
pm.unregister(plugin, name="test_plugin")
@pytest.mark.asyncio
@ -238,7 +239,7 @@ async def test_child_allow_overrides_parent_deny(test_ds):
return None
plugin = PermissionRulesPlugin(rules_callback)
test_ds.pm.register(plugin, name="test_plugin")
pm.register(plugin, name="test_plugin")
try:
actor = {"id": "carol"}
@ -263,7 +264,7 @@ async def test_child_allow_overrides_parent_deny(test_ds):
)
finally:
test_ds.pm.unregister(plugin, name="test_plugin")
pm.unregister(plugin, name="test_plugin")
@pytest.mark.asyncio
@ -287,7 +288,7 @@ async def test_sql_does_filtering_not_python(test_ds):
return PermissionSQL(sql=sql)
plugin = PermissionRulesPlugin(rules_callback)
test_ds.pm.register(plugin, name="test_plugin")
pm.register(plugin, name="test_plugin")
try:
actor = {"id": "dave"}
@ -313,4 +314,4 @@ async def test_sql_does_filtering_not_python(test_ds):
assert tables[0].child == "users"
finally:
test_ds.pm.unregister(plugin, name="test_plugin")
pm.unregister(plugin, name="test_plugin")

View file

@ -8,6 +8,7 @@ based on permission rules from plugins and configuration.
import pytest
import pytest_asyncio
from datasette.app import Datasette
from datasette.plugins import pm
from datasette.permissions import PermissionSQL
from datasette import hookimpl
@ -61,7 +62,7 @@ async def test_tables_endpoint_global_access(test_ds):
return None
plugin = PermissionRulesPlugin(rules_callback)
test_ds.pm.register(plugin, name="test_plugin")
pm.register(plugin, name="test_plugin")
try:
# Use the allowed_resources API directly
@ -86,7 +87,7 @@ async def test_tables_endpoint_global_access(test_ds):
assert "production/orders" in table_names
finally:
test_ds.pm.unregister(plugin, name="test_plugin")
pm.unregister(plugin, name="test_plugin")
@pytest.mark.asyncio
@ -101,7 +102,7 @@ async def test_tables_endpoint_database_restriction(test_ds):
return None
plugin = PermissionRulesPlugin(rules_callback)
test_ds.pm.register(plugin, name="test_plugin")
pm.register(plugin, name="test_plugin")
try:
page = await test_ds.allowed_resources(
@ -129,7 +130,7 @@ async def test_tables_endpoint_database_restriction(test_ds):
# Note: default_permissions.py provides default allows, so we just check analytics are present
finally:
test_ds.pm.unregister(plugin, name="test_plugin")
pm.unregister(plugin, name="test_plugin")
@pytest.mark.asyncio
@ -148,7 +149,7 @@ async def test_tables_endpoint_table_exception(test_ds):
return None
plugin = PermissionRulesPlugin(rules_callback)
test_ds.pm.register(plugin, name="test_plugin")
pm.register(plugin, name="test_plugin")
try:
page = await test_ds.allowed_resources("view-table", {"id": "carol"})
@ -171,7 +172,7 @@ async def test_tables_endpoint_table_exception(test_ds):
assert "analytics/sensitive" not in table_names
finally:
test_ds.pm.unregister(plugin, name="test_plugin")
pm.unregister(plugin, name="test_plugin")
@pytest.mark.asyncio
@ -190,7 +191,7 @@ async def test_tables_endpoint_deny_overrides_allow(test_ds):
return None
plugin = PermissionRulesPlugin(rules_callback)
test_ds.pm.register(plugin, name="test_plugin")
pm.register(plugin, name="test_plugin")
try:
page = await test_ds.allowed_resources(
@ -213,7 +214,7 @@ async def test_tables_endpoint_deny_overrides_allow(test_ds):
assert "analytics/sensitive" not in table_names
finally:
test_ds.pm.unregister(plugin, name="test_plugin")
pm.unregister(plugin, name="test_plugin")
@pytest.mark.asyncio
@ -256,7 +257,7 @@ async def test_tables_endpoint_specific_table_only(test_ds):
return None
plugin = PermissionRulesPlugin(rules_callback)
test_ds.pm.register(plugin, name="test_plugin")
pm.register(plugin, name="test_plugin")
try:
page = await test_ds.allowed_resources("view-table", {"id": "dave"})
@ -279,7 +280,7 @@ async def test_tables_endpoint_specific_table_only(test_ds):
assert "production/orders" in table_names
finally:
test_ds.pm.unregister(plugin, name="test_plugin")
pm.unregister(plugin, name="test_plugin")
@pytest.mark.asyncio
@ -294,7 +295,7 @@ async def test_tables_endpoint_empty_result(test_ds):
return None
plugin = PermissionRulesPlugin(rules_callback)
test_ds.pm.register(plugin, name="test_plugin")
pm.register(plugin, name="test_plugin")
try:
page = await test_ds.allowed_resources("view-table", {"id": "blocked"})
@ -310,7 +311,7 @@ async def test_tables_endpoint_empty_result(test_ds):
assert len(result) == 0
finally:
test_ds.pm.unregister(plugin, name="test_plugin")
pm.unregister(plugin, name="test_plugin")
@pytest.mark.asyncio

View file

@ -142,12 +142,10 @@ def test_metadata_yaml():
settings=[],
secret=None,
root=False,
default_deny=False,
token=None,
actor=None,
version_note=None,
get=None,
headers=False,
help_settings=False,
pdb=False,
crossdb=False,

View file

@ -52,26 +52,6 @@ def test_serve_with_get(tmp_path_factory):
pm.unregister(to_unregister)
def test_serve_with_get_headers():
runner = CliRunner()
result = runner.invoke(
cli,
[
"serve",
"--memory",
"--get",
"/_memory/",
"--headers",
],
)
# exit_code is 1 because it wasn't a 200 response
assert result.exit_code == 1, result.output
lines = result.output.splitlines()
assert lines and lines[0] == "HTTP/1.1 302"
assert "location: /_memory" in lines
assert "content-type: text/html; charset=utf-8" in lines
def test_serve_with_get_and_token():
runner = CliRunner()
result1 = runner.invoke(

View file

@ -97,10 +97,3 @@ def test_custom_route_pattern_404(custom_pages_client):
assert response.status == 404
assert "<h1>Error 404</h1>" in response.text
assert ">Oh no</" in response.text
def test_custom_route_pattern_with_slash_slash_302(custom_pages_client):
# https://github.com/simonw/datasette/issues/2429
response = custom_pages_client.get("//example.com/")
assert response.status == 302
assert response.headers["location"] == "/example.com"

View file

@ -1,129 +0,0 @@
import pytest
from datasette.app import Datasette
from datasette.resources import DatabaseResource, TableResource
@pytest.mark.asyncio
async def test_default_deny_denies_default_permissions():
"""Test that default_deny=True denies default permissions"""
# Without default_deny, anonymous users can view instance/database/tables
ds_normal = Datasette()
await ds_normal.invoke_startup()
# Add a test database
db = ds_normal.add_memory_database("test_db_normal")
await db.execute_write("create table test_table (id integer primary key)")
await ds_normal._refresh_schemas() # Trigger catalog refresh
# Test default behavior - anonymous user should be able to view
response = await ds_normal.client.get("/")
assert response.status_code == 200
response = await ds_normal.client.get("/test_db_normal")
assert response.status_code == 200
response = await ds_normal.client.get("/test_db_normal/test_table")
assert response.status_code == 200
# With default_deny=True, anonymous users should be denied
ds_deny = Datasette(default_deny=True)
await ds_deny.invoke_startup()
# Add the same test database
db = ds_deny.add_memory_database("test_db_deny")
await db.execute_write("create table test_table (id integer primary key)")
await ds_deny._refresh_schemas() # Trigger catalog refresh
# Anonymous user should be denied
response = await ds_deny.client.get("/")
assert response.status_code == 403
response = await ds_deny.client.get("/test_db_deny")
assert response.status_code == 403
response = await ds_deny.client.get("/test_db_deny/test_table")
assert response.status_code == 403
@pytest.mark.asyncio
async def test_default_deny_with_root_user():
"""Test that root user still has access when default_deny=True"""
ds = Datasette(default_deny=True)
ds.root_enabled = True
await ds.invoke_startup()
root_actor = {"id": "root"}
# Root user should have all permissions even with default_deny
assert await ds.allowed(action="view-instance", actor=root_actor) is True
assert (
await ds.allowed(
action="view-database",
actor=root_actor,
resource=DatabaseResource("test_db"),
)
is True
)
assert (
await ds.allowed(
action="view-table",
actor=root_actor,
resource=TableResource("test_db", "test_table"),
)
is True
)
assert (
await ds.allowed(
action="execute-sql", actor=root_actor, resource=DatabaseResource("test_db")
)
is True
)
@pytest.mark.asyncio
async def test_default_deny_with_config_allow():
"""Test that config allow rules still work with default_deny=True"""
ds = Datasette(default_deny=True, config={"allow": {"id": "user1"}})
await ds.invoke_startup()
# Anonymous user should be denied
assert await ds.allowed(action="view-instance", actor=None) is False
# Authenticated user with explicit permission should have access
assert await ds.allowed(action="view-instance", actor={"id": "user1"}) is True
# Different user should be denied
assert await ds.allowed(action="view-instance", actor={"id": "user2"}) is False
@pytest.mark.asyncio
async def test_default_deny_basic_permissions():
"""Test that default_deny=True denies basic permissions"""
ds = Datasette(default_deny=True)
await ds.invoke_startup()
# Anonymous user should be denied all default permissions
assert await ds.allowed(action="view-instance", actor=None) is False
assert (
await ds.allowed(
action="view-database", actor=None, resource=DatabaseResource("test_db")
)
is False
)
assert (
await ds.allowed(
action="view-table",
actor=None,
resource=TableResource("test_db", "test_table"),
)
is False
)
assert (
await ds.allowed(
action="execute-sql", actor=None, resource=DatabaseResource("test_db")
)
is False
)
# Authenticated user without explicit permission should also be denied
assert await ds.allowed(action="view-instance", actor={"id": "user"}) is False

View file

@ -28,9 +28,8 @@ def settings_headings():
return get_headings((docs_path / "settings.rst").read_text(), "~")
def test_settings_are_documented(settings_headings, subtests):
for setting in app.SETTINGS:
with subtests.test(setting=setting.name):
@pytest.mark.parametrize("setting", app.SETTINGS)
def test_settings_are_documented(settings_headings, setting):
assert setting.name in settings_headings
@ -39,13 +38,13 @@ def plugin_hooks_content():
return (docs_path / "plugin_hooks.rst").read_text()
def test_plugin_hooks_are_documented(plugin_hooks_content, subtests):
@pytest.mark.parametrize(
"plugin", [name for name in dir(app.pm.hook) if not name.startswith("_")]
)
def test_plugin_hooks_are_documented(plugin, plugin_hooks_content):
headings = set()
headings.update(get_headings(plugin_hooks_content, "-"))
headings.update(get_headings(plugin_hooks_content, "~"))
plugins = [name for name in dir(app.pm.hook) if not name.startswith("_")]
for plugin in plugins:
with subtests.test(plugin=plugin):
assert plugin in headings
hook_caller = getattr(app.pm.hook, plugin)
arg_names = [a for a in hook_caller.spec.argnames if a != "__multicall__"]
@ -69,10 +68,8 @@ def documented_views():
return view_labels
def test_view_classes_are_documented(documented_views, subtests):
view_classes = [v for v in dir(app) if v.endswith("View")]
for view_class in view_classes:
with subtests.test(view_class=view_class):
@pytest.mark.parametrize("view_class", [v for v in dir(app) if v.endswith("View")])
def test_view_classes_are_documented(documented_views, view_class):
assert view_class in documented_views
@ -88,10 +85,9 @@ def documented_table_filters():
}
def test_table_filters_are_documented(documented_table_filters, subtests):
for f in Filters._filters:
with subtests.test(filter=f.key):
assert f.key in documented_table_filters
@pytest.mark.parametrize("filter", [f.key for f in Filters._filters])
def test_table_filters_are_documented(documented_table_filters, filter):
assert filter in documented_table_filters
@pytest.fixture(scope="session")
@ -105,9 +101,8 @@ def documented_fns():
}
def test_functions_marked_with_documented_are_documented(documented_fns, subtests):
for fn in utils.functions_marked_as_documented:
with subtests.test(fn=fn.__name__):
@pytest.mark.parametrize("fn", utils.functions_marked_as_documented)
def test_functions_marked_with_documented_are_documented(documented_fns, fn):
assert fn.__name__ in documented_fns

View file

@ -2,6 +2,7 @@
# -- start datasette_with_plugin_fixture --
from datasette import hookimpl
from datasette.app import Datasette
from datasette.plugins import pm
import pytest
import pytest_asyncio
@ -17,12 +18,11 @@ async def datasette_with_plugin():
(r"^/error$", lambda: 1 / 0),
]
datasette = Datasette()
datasette.pm.register(TestPlugin(), name="undo")
pm.register(TestPlugin(), name="undo")
try:
yield datasette
yield Datasette()
finally:
datasette.pm.unregister(name="undo")
pm.unregister(name="undo")
# -- end datasette_with_plugin_fixture --

View file

@ -142,7 +142,7 @@ async def test_database_page(ds_client):
# And a list of tables
for fragment in (
'<h2 id="tables">Tables',
'<h2 id="tables">Tables</h2>',
'<h3><a href="/fixtures/sortable">sortable</a></h3>',
"<p><em>pk, foreign_key_with_label, foreign_key_with_blank_label, ",
):
@ -935,7 +935,7 @@ async def test_edit_sql_link_on_canned_queries(ds_client, path, expected):
@pytest.mark.parametrize(
"has_permission",
"permission_allowed",
[
pytest.param(
True,
@ -943,15 +943,15 @@ async def test_edit_sql_link_on_canned_queries(ds_client, path, expected):
False,
],
)
def test_edit_sql_link_not_shown_if_user_lacks_permission(has_permission):
def test_edit_sql_link_not_shown_if_user_lacks_permission(permission_allowed):
with make_app_client(
config={
"allow_sql": None if has_permission else {"id": "not-you"},
"allow_sql": None if permission_allowed else {"id": "not-you"},
"databases": {"fixtures": {"queries": {"simple": "select 1 + 1"}}},
}
) as client:
response = client.get("/fixtures/simple")
if has_permission:
if permission_allowed:
assert "Edit SQL" in response.text
else:
assert "Edit SQL" not in response.text
@ -1194,21 +1194,6 @@ async def test_actions_page(ds_client):
ds_client.ds.root_enabled = original_root_enabled
@pytest.mark.asyncio
async def test_actions_page_does_not_display_none_string(ds_client):
"""Ensure the Resource column doesn't display the string 'None' for null values."""
# https://github.com/simonw/datasette/issues/2599
original_root_enabled = ds_client.ds.root_enabled
try:
ds_client.ds.root_enabled = True
cookies = {"ds_actor": ds_client.actor_cookie({"id": "root"})}
response = await ds_client.get("/-/actions", cookies=cookies)
assert response.status_code == 200
assert "<code>None</code>" not in response.text
finally:
ds_client.ds.root_enabled = original_root_enabled
@pytest.mark.asyncio
async def test_permission_debug_tabs_with_query_string(ds_client):
"""Test that navigation tabs persist query strings across Check, Allowed, and Rules pages"""

View file

@ -91,51 +91,3 @@ async def test_internal_foreign_key_references(ds_client):
)
await internal_db.execute_fn(inner)
@pytest.mark.asyncio
async def test_stale_catalog_entry_database_fix(tmp_path):
"""
Test for https://github.com/simonw/datasette/issues/2605
When the internal database persists across restarts and has entries in
catalog_databases for databases that no longer exist, accessing the
index page should not cause a 500 error (KeyError).
"""
from datasette.app import Datasette
internal_db_path = str(tmp_path / "internal.db")
data_db_path = str(tmp_path / "data.db")
# Create a data database file
import sqlite3
conn = sqlite3.connect(data_db_path)
conn.execute("CREATE TABLE test_table (id INTEGER PRIMARY KEY)")
conn.close()
# First Datasette instance: with the data database and persistent internal db
ds1 = Datasette(files=[data_db_path], internal=internal_db_path)
await ds1.invoke_startup()
# Access the index page to populate the internal catalog
response = await ds1.client.get("/")
assert "data" in ds1.databases
assert response.status_code == 200
# Second Datasette instance: reusing internal.db but WITHOUT the data database
# This simulates restarting Datasette after removing a database
ds2 = Datasette(internal=internal_db_path)
await ds2.invoke_startup()
# The database is not in ds2.databases
assert "data" not in ds2.databases
# Accessing the index page should NOT cause a 500 error
# This is the bug: it currently raises KeyError when trying to
# access ds.databases["data"] for the stale catalog entry
response = await ds2.client.get("/")
assert response.status_code == 200, (
f"Index page should return 200, not {response.status_code}. "
"This fails due to stale catalog entries causing KeyError."
)

View file

@ -1,7 +1,6 @@
import httpx
import pytest
import pytest_asyncio
from datasette.app import Datasette
@pytest_asyncio.fixture
@ -10,23 +9,6 @@ async def datasette(ds_client):
return ds_client.ds
@pytest_asyncio.fixture
async def datasette_with_permissions():
"""A datasette instance with permission restrictions for testing"""
ds = Datasette(config={"databases": {"test_db": {"allow": {"id": "admin"}}}})
await ds.invoke_startup()
db = ds.add_memory_database("test_datasette_with_permissions", name="test_db")
await db.execute_write(
"create table if not exists test_table (id integer primary key, name text)"
)
await db.execute_write(
"insert or ignore into test_table (id, name) values (1, 'Alice')"
)
# Trigger catalog refresh
await ds.client.get("/")
return ds
@pytest.mark.asyncio
@pytest.mark.parametrize(
"method,path,expected_status",
@ -83,231 +65,3 @@ async def test_client_path(datasette, prefix, expected_path):
assert path == expected_path
finally:
datasette._settings["base_url"] = original_base_url
@pytest.mark.asyncio
async def test_skip_permission_checks_allows_forbidden_access(
datasette_with_permissions,
):
"""Test that skip_permission_checks=True bypasses permission checks"""
ds = datasette_with_permissions
# Without skip_permission_checks, anonymous user should get 403 for protected database
response = await ds.client.get("/test_db.json")
assert response.status_code == 403
# With skip_permission_checks=True, should get 200
response = await ds.client.get("/test_db.json", skip_permission_checks=True)
assert response.status_code == 200
data = response.json()
assert data["database"] == "test_db"
@pytest.mark.asyncio
async def test_skip_permission_checks_on_table(datasette_with_permissions):
"""Test skip_permission_checks works for table access"""
ds = datasette_with_permissions
# Without skip_permission_checks, should get 403
response = await ds.client.get("/test_db/test_table.json")
assert response.status_code == 403
# With skip_permission_checks=True, should get table data
response = await ds.client.get(
"/test_db/test_table.json", skip_permission_checks=True
)
assert response.status_code == 200
data = response.json()
assert data["rows"] == [{"id": 1, "name": "Alice"}]
@pytest.mark.asyncio
@pytest.mark.parametrize(
"method", ["get", "post", "put", "patch", "delete", "options", "head"]
)
async def test_skip_permission_checks_all_methods(datasette_with_permissions, method):
"""Test that skip_permission_checks works with all HTTP methods"""
ds = datasette_with_permissions
# All methods should work with skip_permission_checks=True
client_method = getattr(ds.client, method)
response = await client_method("/test_db.json", skip_permission_checks=True)
# We don't check status code since some methods might not be allowed,
# but we verify the request doesn't fail due to permissions
assert isinstance(response, httpx.Response)
@pytest.mark.asyncio
async def test_skip_permission_checks_request_method(datasette_with_permissions):
"""Test that skip_permission_checks works with client.request()"""
ds = datasette_with_permissions
# Without skip_permission_checks
response = await ds.client.request("GET", "/test_db.json")
assert response.status_code == 403
# With skip_permission_checks=True
response = await ds.client.request(
"GET", "/test_db.json", skip_permission_checks=True
)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_skip_permission_checks_isolated_to_request(datasette_with_permissions):
"""Test that skip_permission_checks doesn't affect other concurrent requests"""
ds = datasette_with_permissions
# First request with skip_permission_checks=True should succeed
response1 = await ds.client.get("/test_db.json", skip_permission_checks=True)
assert response1.status_code == 200
# Subsequent request without it should still get 403
response2 = await ds.client.get("/test_db.json")
assert response2.status_code == 403
# And another with skip should succeed again
response3 = await ds.client.get("/test_db.json", skip_permission_checks=True)
assert response3.status_code == 200
@pytest.mark.asyncio
async def test_skip_permission_checks_with_admin_actor(datasette_with_permissions):
"""Test that skip_permission_checks works even when actor is provided"""
ds = datasette_with_permissions
# Admin actor should normally have access
admin_cookies = {"ds_actor": ds.client.actor_cookie({"id": "admin"})}
response = await ds.client.get("/test_db.json", cookies=admin_cookies)
assert response.status_code == 200
# Non-admin actor should get 403
user_cookies = {"ds_actor": ds.client.actor_cookie({"id": "user"})}
response = await ds.client.get("/test_db.json", cookies=user_cookies)
assert response.status_code == 403
# Non-admin actor with skip_permission_checks=True should get 200
response = await ds.client.get(
"/test_db.json", cookies=user_cookies, skip_permission_checks=True
)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_skip_permission_checks_shows_denied_tables():
"""Test that skip_permission_checks=True shows tables from denied databases in /-/tables.json"""
ds = Datasette(
config={
"databases": {
"fixtures": {"allow": False} # Deny all access to this database
}
}
)
await ds.invoke_startup()
db = ds.add_memory_database("fixtures")
await db.execute_write(
"CREATE TABLE test_table (id INTEGER PRIMARY KEY, name TEXT)"
)
await db.execute_write("INSERT INTO test_table (id, name) VALUES (1, 'Alice')")
await ds._refresh_schemas()
# Without skip_permission_checks, tables from denied database should not appear in /-/tables.json
response = await ds.client.get("/-/tables.json")
assert response.status_code == 200
data = response.json()
table_names = [match["name"] for match in data["matches"]]
# Should not see any fixtures tables since access is denied
fixtures_tables = [name for name in table_names if name.startswith("fixtures:")]
assert len(fixtures_tables) == 0
# With skip_permission_checks=True, tables from denied database SHOULD appear
response = await ds.client.get("/-/tables.json", skip_permission_checks=True)
assert response.status_code == 200
data = response.json()
table_names = [match["name"] for match in data["matches"]]
# Should see fixtures tables when permission checks are skipped
assert "fixtures: test_table" in table_names
@pytest.mark.asyncio
async def test_in_client_returns_false_outside_request(datasette):
"""Test that datasette.in_client() returns False outside of a client request"""
assert datasette.in_client() is False
@pytest.mark.asyncio
async def test_in_client_returns_true_inside_request():
"""Test that datasette.in_client() returns True inside a client request"""
from datasette import hookimpl, Response
class TestPlugin:
__name__ = "test_in_client_plugin"
@hookimpl
def register_routes(self):
async def test_view(datasette):
# Assert in_client() returns True within the view
assert datasette.in_client() is True
return Response.json({"in_client": datasette.in_client()})
return [
(r"^/-/test-in-client$", test_view),
]
ds = Datasette()
await ds.invoke_startup()
ds.pm.register(TestPlugin(), name="test_in_client_plugin")
try:
# Outside of a client request, should be False
assert ds.in_client() is False
# Make a request via datasette.client
response = await ds.client.get("/-/test-in-client")
assert response.status_code == 200
assert response.json()["in_client"] is True
# After the request, should be False again
assert ds.in_client() is False
finally:
ds.pm.unregister(name="test_in_client_plugin")
@pytest.mark.asyncio
async def test_in_client_with_skip_permission_checks():
"""Test that in_client() works regardless of skip_permission_checks value"""
from datasette import hookimpl
from datasette.utils.asgi import Response
in_client_values = []
class TestPlugin:
__name__ = "test_in_client_skip_plugin"
@hookimpl
def register_routes(self):
async def test_view(datasette):
in_client_values.append(datasette.in_client())
return Response.json({"in_client": datasette.in_client()})
return [
(r"^/-/test-in-client$", test_view),
]
ds = Datasette(config={"databases": {"test_db": {"allow": {"id": "admin"}}}})
await ds.invoke_startup()
ds.pm.register(TestPlugin(), name="test_in_client_skip_plugin")
try:
# Request without skip_permission_checks
await ds.client.get("/-/test-in-client")
# Request with skip_permission_checks=True
await ds.client.get("/-/test-in-client", skip_permission_checks=True)
# Both should have detected in_client as True
assert (
len(in_client_values) == 2
), f"Expected 2 values, got {len(in_client_values)}"
assert all(in_client_values), f"Expected all True, got {in_client_values}"
finally:
ds.pm.unregister(name="test_in_client_skip_plugin")

View file

@ -439,6 +439,7 @@ async def test_execute_sql_requires_view_database():
be able to execute SQL on that database.
"""
from datasette.permissions import PermissionSQL
from datasette.plugins import pm
from datasette import hookimpl
class TestPermissionPlugin:
@ -463,12 +464,11 @@ async def test_execute_sql_requires_view_database():
return []
plugin = TestPermissionPlugin()
ds = Datasette()
await ds.invoke_startup()
ds.pm.register(plugin, name="test_plugin")
pm.register(plugin, name="test_plugin")
try:
ds = Datasette()
await ds.invoke_startup()
ds.add_memory_database("secret")
await ds.refresh_schemas()
@ -498,4 +498,4 @@ async def test_execute_sql_requires_view_database():
f"but got {response.status_code}"
)
finally:
ds.pm.unregister(plugin)
pm.unregister(plugin)

View file

@ -1323,20 +1323,6 @@ async def test_actor_restrictions(
("dbname2", "tablename"),
False,
),
# Table-level restriction allows access to that specific table
(
{"r": {"dbname": {"tablename": ["view-table"]}}},
"view-table",
("dbname", "tablename"),
True,
),
# But not to a different table in the same database
(
{"r": {"dbname": {"tablename": ["view-table"]}}},
"view-table",
("dbname", "other_table"),
False,
),
),
)
async def test_restrictions_allow_action(restrictions, action, resource, expected):
@ -1667,48 +1653,3 @@ async def test_permission_check_view_requires_debug_permission():
data = response.json()
assert data["action"] == "view-instance"
assert data["allowed"] is True
@pytest.mark.asyncio
async def test_root_allow_block_with_table_restricted_actor():
"""
Test that root-level allow: blocks are processed for actors with
table-level restrictions.
This covers the case in config.py is_in_restriction_allowlist() where
parent=None, child=None and actor has table restrictions but not global.
"""
from datasette.resources import TableResource
# Config with root-level allow block that denies non-admin users
ds = Datasette(
config={
"allow": {"id": "admin"}, # Root-level allow block
}
)
await ds.invoke_startup()
db = ds.add_memory_database("mydb")
await db.execute_write("create table t1 (id integer primary key)")
await ds.client.get("/") # Trigger catalog refresh
# Actor with table-level restrictions only (not global)
actor = {"id": "user", "_r": {"r": {"mydb": {"t1": ["view-table"]}}}}
# The root-level allow: {id: admin} should be processed and deny this user
# because they're not "admin", even though they have table restrictions
result = await ds.allowed(
action="view-table",
resource=TableResource("mydb", "t1"),
actor=actor,
)
# Should be False because root allow: {id: admin} denies non-admin users
assert result is False
# But admin with same restrictions should be allowed
admin_actor = {"id": "admin", "_r": {"r": {"mydb": {"t1": ["view-table"]}}}}
result = await ds.allowed(
action="view-table",
resource=TableResource("mydb", "t1"),
actor=admin_actor,
)
assert result is True

View file

@ -677,7 +677,7 @@ async def test_existing_scope_actor_respected(ds_client):
("this_is_denied_async", False),
],
)
async def test_hook_custom_allowed(action, expected):
async def test_hook_permission_allowed(action, expected):
# Test actions and permission logic are defined in tests/plugins/my_plugin.py
ds = Datasette(plugins_dir=PLUGINS_DIR)
await ds.invoke_startup()
@ -691,7 +691,7 @@ async def test_hook_permission_resources_sql():
await ds.invoke_startup()
collected = []
for block in ds.pm.hook.permission_resources_sql(
for block in pm.hook.permission_resources_sql(
datasette=ds,
actor={"id": "alice"},
action="view-table",
@ -1161,12 +1161,12 @@ async def test_hook_filters_from_request(ds_client):
if request.args.get("_nothing"):
return FilterArguments(["1 = 0"], human_descriptions=["NOTHING"])
ds_client.ds.pm.register(ReturnNothingPlugin(), name="ReturnNothingPlugin")
pm.register(ReturnNothingPlugin(), name="ReturnNothingPlugin")
response = await ds_client.get("/fixtures/facetable?_nothing=1")
assert "0 rows\n where NOTHING" in response.text
json_response = await ds_client.get("/fixtures/facetable.json?_nothing=1")
assert json_response.json()["rows"] == []
ds_client.ds.pm.unregister(name="ReturnNothingPlugin")
pm.unregister(name="ReturnNothingPlugin")
@pytest.mark.asyncio
@ -1327,7 +1327,7 @@ async def test_hook_actors_from_ids():
return inner
try:
ds.pm.register(ActorsFromIdsPlugin(), name="ActorsFromIdsPlugin")
pm.register(ActorsFromIdsPlugin(), name="ActorsFromIdsPlugin")
actors2 = await ds.actors_from_ids(["3", "5", "7"])
assert actors2 == {
"3": {"id": "3", "name": "Cate Blanchett"},
@ -1335,7 +1335,7 @@ async def test_hook_actors_from_ids():
"7": {"id": "7", "name": "Sarah Paulson"},
}
finally:
ds.pm.unregister(name="ReturnNothingPlugin")
pm.unregister(name="ReturnNothingPlugin")
@pytest.mark.asyncio
@ -1350,14 +1350,14 @@ async def test_plugin_is_installed():
return {}
try:
datasette.pm.register(DummyPlugin(), name="DummyPlugin")
pm.register(DummyPlugin(), name="DummyPlugin")
response = await datasette.client.get("/-/plugins.json")
assert response.status_code == 200
installed_plugins = {p["name"] for p in response.json()}
assert "DummyPlugin" in installed_plugins
finally:
datasette.pm.unregister(name="DummyPlugin")
pm.unregister(name="DummyPlugin")
@pytest.mark.asyncio
@ -1384,7 +1384,7 @@ async def test_hook_jinja2_environment_from_request(tmpdir):
datasette = Datasette(memory=True)
try:
datasette.pm.register(EnvironmentPlugin(), name="EnvironmentPlugin")
pm.register(EnvironmentPlugin(), name="EnvironmentPlugin")
response = await datasette.client.get("/")
assert response.status_code == 200
assert "Hello museums!" not in response.text
@ -1395,7 +1395,7 @@ async def test_hook_jinja2_environment_from_request(tmpdir):
assert response2.status_code == 200
assert "Hello museums!" in response2.text
finally:
datasette.pm.unregister(name="EnvironmentPlugin")
pm.unregister(name="EnvironmentPlugin")
class SlotPlugin:
@ -1433,48 +1433,48 @@ class SlotPlugin:
@pytest.mark.asyncio
async def test_hook_top_homepage():
datasette = Datasette(memory=True)
try:
datasette.pm.register(SlotPlugin(), name="SlotPlugin")
pm.register(SlotPlugin(), name="SlotPlugin")
datasette = Datasette(memory=True)
response = await datasette.client.get("/?z=foo")
assert response.status_code == 200
assert "Xtop_homepage:foo" in response.text
finally:
datasette.pm.unregister(name="SlotPlugin")
pm.unregister(name="SlotPlugin")
@pytest.mark.asyncio
async def test_hook_top_database():
datasette = Datasette(memory=True)
try:
datasette.pm.register(SlotPlugin(), name="SlotPlugin")
pm.register(SlotPlugin(), name="SlotPlugin")
datasette = Datasette(memory=True)
response = await datasette.client.get("/_memory?z=bar")
assert response.status_code == 200
assert "Xtop_database:_memory:bar" in response.text
finally:
datasette.pm.unregister(name="SlotPlugin")
pm.unregister(name="SlotPlugin")
@pytest.mark.asyncio
async def test_hook_top_table(ds_client):
try:
ds_client.ds.pm.register(SlotPlugin(), name="SlotPlugin")
pm.register(SlotPlugin(), name="SlotPlugin")
response = await ds_client.get("/fixtures/facetable?z=baz")
assert response.status_code == 200
assert "Xtop_table:fixtures:facetable:baz" in response.text
finally:
ds_client.ds.pm.unregister(name="SlotPlugin")
pm.unregister(name="SlotPlugin")
@pytest.mark.asyncio
async def test_hook_top_row(ds_client):
try:
ds_client.ds.pm.register(SlotPlugin(), name="SlotPlugin")
pm.register(SlotPlugin(), name="SlotPlugin")
response = await ds_client.get("/fixtures/facet_cities/1?z=bax")
assert response.status_code == 200
assert "Xtop_row:fixtures:facet_cities:San Francisco:bax" in response.text
finally:
ds_client.ds.pm.unregister(name="SlotPlugin")
pm.unregister(name="SlotPlugin")
@pytest.mark.asyncio

View file

@ -57,20 +57,12 @@ def test_publish_cloudrun_prompts_for_service(
"Service name: input-service"
) == result.output.strip()
assert 0 == result.exit_code
tag = "us-docker.pkg.dev/myproject/datasette/datasette-input-service"
tag = "gcr.io/myproject/datasette-input-service"
mock_call.assert_has_calls(
[
mock.call(
"gcloud services enable artifactregistry.googleapis.com --project myproject --quiet",
shell=True,
),
mock.call(
"gcloud artifacts repositories describe datasette --project myproject --location us --quiet",
shell=True,
),
mock.call(f"gcloud builds submit --tag {tag}", shell=True),
mock.call(
"gcloud run deploy --allow-unauthenticated --platform=managed --image {} input-service --max-instances 1".format(
"gcloud run deploy --allow-unauthenticated --platform=managed --image {} input-service".format(
tag
),
shell=True,
@ -94,20 +86,12 @@ def test_publish_cloudrun(mock_call, mock_output, mock_which, tmp_path_factory):
cli.cli, ["publish", "cloudrun", "test.db", "--service", "test"]
)
assert 0 == result.exit_code
tag = f"us-docker.pkg.dev/{mock_output.return_value}/datasette/datasette-test"
tag = f"gcr.io/{mock_output.return_value}/datasette-test"
mock_call.assert_has_calls(
[
mock.call(
f"gcloud services enable artifactregistry.googleapis.com --project {mock_output.return_value} --quiet",
shell=True,
),
mock.call(
f"gcloud artifacts repositories describe datasette --project {mock_output.return_value} --location us --quiet",
shell=True,
),
mock.call(f"gcloud builds submit --tag {tag}", shell=True),
mock.call(
"gcloud run deploy --allow-unauthenticated --platform=managed --image {} test --max-instances 1".format(
"gcloud run deploy --allow-unauthenticated --platform=managed --image {} test".format(
tag
),
shell=True,
@ -183,7 +167,7 @@ def test_publish_cloudrun_memory_cpu(
assert 2 == result.exit_code
return
assert 0 == result.exit_code
tag = f"us-docker.pkg.dev/{mock_output.return_value}/datasette/datasette-test"
tag = f"gcr.io/{mock_output.return_value}/datasette-test"
expected_call = (
"gcloud run deploy --allow-unauthenticated --platform=managed"
" --image {} test".format(tag)
@ -195,18 +179,8 @@ def test_publish_cloudrun_memory_cpu(
expected_call += " --cpu {}".format(cpu)
if timeout:
expected_build_call += f" --timeout {timeout}"
# max_instances defaults to 1
expected_call += " --max-instances 1"
mock_call.assert_has_calls(
[
mock.call(
f"gcloud services enable artifactregistry.googleapis.com --project {mock_output.return_value} --quiet",
shell=True,
),
mock.call(
f"gcloud artifacts repositories describe datasette --project {mock_output.return_value} --location us --quiet",
shell=True,
),
mock.call(expected_build_call, shell=True),
mock.call(
expected_call,

View file

@ -13,6 +13,7 @@ async def test_multiple_restriction_sources_intersect():
provide restriction_sql - both must pass for access to be granted.
"""
from datasette import hookimpl
from datasette.plugins import pm
class RestrictivePlugin:
__name__ = "RestrictivePlugin"
@ -28,12 +29,11 @@ async def test_multiple_restriction_sources_intersect():
return None
plugin = RestrictivePlugin()
ds = Datasette()
await ds.invoke_startup()
ds.pm.register(plugin, name="restrictive_plugin")
pm.register(plugin, name="restrictive_plugin")
try:
ds = Datasette()
await ds.invoke_startup()
db1 = ds.add_memory_database("db1_multi_intersect")
db2 = ds.add_memory_database("db2_multi_intersect")
await db1.execute_write("CREATE TABLE t1 (id INTEGER)")
@ -55,7 +55,7 @@ async def test_multiple_restriction_sources_intersect():
assert ("db1_multi_intersect", "t1") in resources
assert ("db2_multi_intersect", "t1") not in resources
finally:
ds.pm.unregister(name="restrictive_plugin")
pm.unregister(name="restrictive_plugin")
@pytest.mark.asyncio
@ -265,6 +265,7 @@ async def test_permission_resources_sql_multiple_restriction_sources_intersect()
provide restriction_sql - both must pass for access to be granted.
"""
from datasette import hookimpl
from datasette.plugins import pm
class RestrictivePlugin:
__name__ = "RestrictivePlugin"
@ -280,12 +281,11 @@ async def test_permission_resources_sql_multiple_restriction_sources_intersect()
return None
plugin = RestrictivePlugin()
ds = Datasette()
await ds.invoke_startup()
ds.pm.register(plugin, name="restrictive_plugin")
pm.register(plugin, name="restrictive_plugin")
try:
ds = Datasette()
await ds.invoke_startup()
db1 = ds.add_memory_database("db1_multi_restrictions")
db2 = ds.add_memory_database("db2_multi_restrictions")
await db1.execute_write("CREATE TABLE t1 (id INTEGER)")
@ -312,4 +312,4 @@ async def test_permission_resources_sql_multiple_restriction_sources_intersect()
assert ("db1_multi_restrictions", "t1") in resources
assert ("db2_multi_restrictions", "t1") not in resources
finally:
ds.pm.unregister(name="restrictive_plugin")
pm.unregister(name="restrictive_plugin")

View file

@ -1,248 +0,0 @@
import asyncio
import pytest
import pytest_asyncio
from datasette.app import Datasette
@pytest_asyncio.fixture(scope="module")
async def schema_ds():
"""Create a Datasette instance with test databases and permission config."""
ds = Datasette(
config={
"databases": {
"schema_private_db": {"allow": {"id": "root"}},
}
}
)
# Create public database with multiple tables
public_db = ds.add_memory_database("schema_public_db")
await public_db.execute_write(
"CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)"
)
await public_db.execute_write(
"CREATE TABLE IF NOT EXISTS posts (id INTEGER PRIMARY KEY, title TEXT)"
)
await public_db.execute_write(
"CREATE VIEW IF NOT EXISTS recent_posts AS SELECT * FROM posts ORDER BY id DESC"
)
# Create a database with restricted access (requires root permission)
private_db = ds.add_memory_database("schema_private_db")
await private_db.execute_write(
"CREATE TABLE IF NOT EXISTS secret_data (id INTEGER PRIMARY KEY, value TEXT)"
)
# Create an empty database
ds.add_memory_database("schema_empty_db")
return ds
@pytest.mark.asyncio
@pytest.mark.parametrize(
"format_ext,expected_in_content",
[
("json", None),
("md", ["# Schema for", "```sql"]),
("", ["Schema for", "CREATE TABLE"]),
],
)
async def test_database_schema_formats(schema_ds, format_ext, expected_in_content):
"""Test /database/-/schema endpoint in different formats."""
url = "/schema_public_db/-/schema"
if format_ext:
url += f".{format_ext}"
response = await schema_ds.client.get(url)
assert response.status_code == 200
if format_ext == "json":
data = response.json()
assert "database" in data
assert data["database"] == "schema_public_db"
assert "schema" in data
assert "CREATE TABLE users" in data["schema"]
else:
content = response.text
for expected in expected_in_content:
assert expected in content
@pytest.mark.asyncio
@pytest.mark.parametrize(
"format_ext,expected_in_content",
[
("json", None),
("md", ["# Schema for", "```sql"]),
("", ["Schema for all databases"]),
],
)
async def test_instance_schema_formats(schema_ds, format_ext, expected_in_content):
"""Test /-/schema endpoint in different formats."""
url = "/-/schema"
if format_ext:
url += f".{format_ext}"
response = await schema_ds.client.get(url)
assert response.status_code == 200
if format_ext == "json":
data = response.json()
assert "schemas" in data
assert isinstance(data["schemas"], list)
db_names = [item["database"] for item in data["schemas"]]
# Should see schema_public_db and schema_empty_db, but not schema_private_db (anonymous user)
assert "schema_public_db" in db_names
assert "schema_empty_db" in db_names
assert "schema_private_db" not in db_names
# Check schemas are present
for item in data["schemas"]:
if item["database"] == "schema_public_db":
assert "CREATE TABLE users" in item["schema"]
else:
content = response.text
for expected in expected_in_content:
assert expected in content
@pytest.mark.asyncio
@pytest.mark.parametrize(
"format_ext,expected_in_content",
[
("json", None),
("md", ["# Schema for", "```sql"]),
("", ["Schema for users"]),
],
)
async def test_table_schema_formats(schema_ds, format_ext, expected_in_content):
"""Test /database/table/-/schema endpoint in different formats."""
url = "/schema_public_db/users/-/schema"
if format_ext:
url += f".{format_ext}"
response = await schema_ds.client.get(url)
assert response.status_code == 200
if format_ext == "json":
data = response.json()
assert "database" in data
assert data["database"] == "schema_public_db"
assert "table" in data
assert data["table"] == "users"
assert "schema" in data
assert "CREATE TABLE users" in data["schema"]
else:
content = response.text
for expected in expected_in_content:
assert expected in content
@pytest.mark.asyncio
@pytest.mark.parametrize(
"url",
[
"/schema_private_db/-/schema.json",
"/schema_private_db/secret_data/-/schema.json",
],
)
async def test_schema_permission_enforcement(schema_ds, url):
"""Test that permissions are enforced for schema endpoints."""
# Anonymous user should get 403
response = await schema_ds.client.get(url)
assert response.status_code == 403
# Authenticated user with permission should succeed
response = await schema_ds.client.get(
url,
cookies={"ds_actor": schema_ds.client.actor_cookie({"id": "root"})},
)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_instance_schema_respects_database_permissions(schema_ds):
"""Test that /-/schema only shows databases the user can view."""
# Anonymous user should only see public databases
response = await schema_ds.client.get("/-/schema.json")
assert response.status_code == 200
data = response.json()
db_names = [item["database"] for item in data["schemas"]]
assert "schema_public_db" in db_names
assert "schema_empty_db" in db_names
assert "schema_private_db" not in db_names
# Authenticated user should see all databases
response = await schema_ds.client.get(
"/-/schema.json",
cookies={"ds_actor": schema_ds.client.actor_cookie({"id": "root"})},
)
assert response.status_code == 200
data = response.json()
db_names = [item["database"] for item in data["schemas"]]
assert "schema_public_db" in db_names
assert "schema_empty_db" in db_names
assert "schema_private_db" in db_names
@pytest.mark.asyncio
async def test_database_schema_with_multiple_tables(schema_ds):
"""Test schema with multiple tables in a database."""
response = await schema_ds.client.get("/schema_public_db/-/schema.json")
assert response.status_code == 200
data = response.json()
schema = data["schema"]
# All objects should be in the schema
assert "CREATE TABLE users" in schema
assert "CREATE TABLE posts" in schema
assert "CREATE VIEW recent_posts" in schema
@pytest.mark.asyncio
async def test_empty_database_schema(schema_ds):
"""Test schema for an empty database."""
response = await schema_ds.client.get("/schema_empty_db/-/schema.json")
assert response.status_code == 200
data = response.json()
assert data["database"] == "schema_empty_db"
assert data["schema"] == ""
@pytest.mark.asyncio
async def test_database_not_exists(schema_ds):
"""Test schema for a non-existent database returns 404."""
# Test JSON format
response = await schema_ds.client.get("/nonexistent_db/-/schema.json")
assert response.status_code == 404
data = response.json()
assert data["ok"] is False
assert "not found" in data["error"].lower()
# Test HTML format (returns text)
response = await schema_ds.client.get("/nonexistent_db/-/schema")
assert response.status_code == 404
assert "not found" in response.text.lower()
# Test Markdown format (returns text)
response = await schema_ds.client.get("/nonexistent_db/-/schema.md")
assert response.status_code == 404
assert "not found" in response.text.lower()
@pytest.mark.asyncio
async def test_table_not_exists(schema_ds):
"""Test schema for a non-existent table returns 404."""
# Test JSON format
response = await schema_ds.client.get("/schema_public_db/nonexistent/-/schema.json")
assert response.status_code == 404
data = response.json()
assert data["ok"] is False
assert "not found" in data["error"].lower()
# Test HTML format (returns text)
response = await schema_ds.client.get("/schema_public_db/nonexistent/-/schema")
assert response.status_code == 404
assert "not found" in response.text.lower()
# Test Markdown format (returns text)
response = await schema_ds.client.get("/schema_public_db/nonexistent/-/schema.md")
assert response.status_code == 404
assert "not found" in response.text.lower()