register_permissions() plugin hook (#1940)

* Docs for permissions: in metadata, refs #1636
* Refactor default_permissions.py to help with implementation of #1636
* register_permissions() plugin hook, closes #1939 - also refs #1938
* Tests for register_permissions() hook, refs #1939
* Documentation for datasette.permissions, refs #1939
* permission_allowed() falls back on Permission.default, refs #1939
* Raise StartupError on duplicate permissions
* Allow dupe permisisons if exact matches
This commit is contained in:
Simon Willison 2022-12-12 18:05:54 -08:00 committed by GitHub
commit 8bf06a76b5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 513 additions and 88 deletions

View file

@ -90,6 +90,13 @@ def check_permission_actions_are_documented():
def before(hook_name, hook_impls, kwargs):
if hook_name == "permission_allowed":
datasette = kwargs["datasette"]
assert kwargs["action"] in datasette.permissions, (
"'{}' has not been registered with register_permissions()".format(
kwargs["action"]
)
+ " (or maybe a test forgot to do await ds.invoke_startup())"
)
action = kwargs.get("action").replace("-", "_")
assert (
action in documented_permission_actions

View file

@ -48,6 +48,7 @@ EXPECTED_PLUGINS = [
"prepare_jinja2_environment",
"register_facet_classes",
"register_magic_parameters",
"register_permissions",
"register_routes",
"render_cell",
"skip_csrf",

View file

@ -1,5 +1,5 @@
import asyncio
from datasette import hookimpl
from datasette import hookimpl, Permission
from datasette.facets import Facet
from datasette import tracer
from datasette.utils import path_with_added_args
@ -406,3 +406,31 @@ def database_actions(datasette, database, actor, request):
@hookimpl
def skip_csrf(scope):
return scope["path"] == "/skip-csrf"
@hookimpl
def register_permissions(datasette):
extras = datasette.plugin_config("datasette-register-permissions") or {}
permissions = [
Permission(
name="new-permission",
abbr="np",
description="New permission",
takes_database=True,
takes_resource=False,
default=False,
)
]
if extras:
permissions.extend(
Permission(
name=p["name"],
abbr=p["abbr"],
description=p["description"],
takes_database=p["takes_database"],
takes_resource=p["takes_resource"],
default=p["default"],
)
for p in extras["permissions"]
)
return permissions

View file

@ -126,6 +126,7 @@ async def test_through_filters_from_request(app_client):
@pytest.mark.asyncio
async def test_where_filters_from_request(app_client):
await app_client.ds.invoke_startup()
request = Request.fake("/?_where=pk+>+3")
filter_args = await (
where_filters(

View file

@ -116,6 +116,7 @@ async def test_datasette_ensure_permissions_check_visibility(
actor, metadata, permissions, should_allow, expected_private
):
ds = Datasette([], memory=True, metadata=metadata)
await ds.invoke_startup()
if not should_allow:
with pytest.raises(Forbidden):
await ds.ensure_permissions(actor, permissions)

View file

@ -1,3 +1,4 @@
import collections
from datasette.app import Datasette
from .fixtures import app_client, assert_permissions_checked, make_app_client
from bs4 import BeautifulSoup as Soup
@ -640,3 +641,49 @@ async def test_actor_restricted_permissions(
"result": expected_result,
}
assert response.json() == expected
PermMetadataTestCase = collections.namedtuple(
"PermMetadataTestCase",
"metadata,actor,action,resource,default,expected_result",
)
@pytest.mark.asyncio
@pytest.mark.xfail(reason="Not implemented yet")
@pytest.mark.parametrize(
"metadata,actor,action,resource,default,expected_result",
(
# Simple view-instance default=True example
PermMetadataTestCase(
metadata={},
actor=None,
action="view-instance",
resource=None,
default=True,
expected_result=True,
),
# debug-menu on root
PermMetadataTestCase(
metadata={"permissions": {"debug-menu": {"id": "user"}}},
actor={"id": "user"},
action="debug-menu",
resource=None,
default=False,
expected_result=True,
),
),
)
async def test_permissions_in_metadata(
perms_ds, metadata, actor, action, resource, default, expected_result
):
previous_metadata = perms_ds.metadata()
updated_metadata = copy.deepcopy(previous_metadata)
updated_metadata.update(metadata)
try:
result = await perms_ds.permission_allowed(
actor, action, resource, default=default
)
assert result == expected_result
finally:
perms_ds._metadata_local = previous_metadata

View file

@ -4,15 +4,16 @@ from .fixtures import (
make_app_client,
TABLES,
TEMP_PLUGIN_SECRET_FILE,
PLUGINS_DIR,
TestClient as _TestClient,
) # noqa
from click.testing import CliRunner
from datasette.app import Datasette
from datasette import cli, hookimpl
from datasette import cli, hookimpl, Permission
from datasette.filters import FilterArguments
from datasette.plugins import get_plugins, DEFAULT_PLUGINS, pm
from datasette.utils.sqlite import sqlite3
from datasette.utils import CustomRow
from datasette.utils import CustomRow, StartupError
from jinja2.environment import Template
import base64
import importlib
@ -635,14 +636,32 @@ def test_existing_scope_actor_respected(app_client):
("this_is_denied", False),
("this_is_allowed_async", True),
("this_is_denied_async", False),
("no_match", None),
],
)
async def test_hook_permission_allowed(app_client, action, expected):
actual = await app_client.ds.permission_allowed(
{"id": "actor"}, action, default=None
)
assert expected == actual
async def test_hook_permission_allowed(action, expected):
class TestPlugin:
__name__ = "TestPlugin"
@hookimpl
def register_permissions(self):
return [
Permission(name, None, None, False, False, False)
for name in (
"this_is_allowed",
"this_is_denied",
"this_is_allowed_async",
"this_is_denied_async",
)
]
pm.register(TestPlugin(), name="undo_register_extras")
try:
ds = Datasette(plugins_dir=PLUGINS_DIR)
await ds.invoke_startup()
actual = await ds.permission_allowed({"id": "actor"}, action)
assert expected == actual
finally:
pm.unregister(name="undo_register_extras")
def test_actor_json(app_client):
@ -1023,3 +1042,125 @@ def test_hook_filters_from_request(app_client):
json_response = app_client.get("/fixtures/facetable.json?_nothing=1")
assert json_response.json["rows"] == []
pm.unregister(name="ReturnNothingPlugin")
@pytest.mark.asyncio
@pytest.mark.parametrize("extra_metadata", (False, True))
async def test_hook_register_permissions(extra_metadata):
ds = Datasette(
metadata={
"plugins": {
"datasette-register-permissions": {
"permissions": [
{
"name": "extra-from-metadata",
"abbr": "efm",
"description": "Extra from metadata",
"takes_database": False,
"takes_resource": False,
"default": True,
}
]
}
}
}
if extra_metadata
else None,
plugins_dir=PLUGINS_DIR,
)
await ds.invoke_startup()
assert ds.permissions["new-permission"] == Permission(
name="new-permission",
abbr="np",
description="New permission",
takes_database=True,
takes_resource=False,
default=False,
)
if extra_metadata:
assert ds.permissions["extra-from-metadata"] == Permission(
name="extra-from-metadata",
abbr="efm",
description="Extra from metadata",
takes_database=False,
takes_resource=False,
default=True,
)
else:
assert "extra-from-metadata" not in ds.permissions
@pytest.mark.asyncio
@pytest.mark.parametrize("duplicate", ("name", "abbr"))
async def test_hook_register_permissions_no_duplicates(duplicate):
name1, name2 = "name1", "name2"
abbr1, abbr2 = "abbr1", "abbr2"
if duplicate == "name":
name2 = "name1"
if duplicate == "abbr":
abbr2 = "abbr1"
ds = Datasette(
metadata={
"plugins": {
"datasette-register-permissions": {
"permissions": [
{
"name": name1,
"abbr": abbr1,
"description": None,
"takes_database": False,
"takes_resource": False,
"default": True,
},
{
"name": name2,
"abbr": abbr2,
"description": None,
"takes_database": False,
"takes_resource": False,
"default": True,
},
]
}
}
},
plugins_dir=PLUGINS_DIR,
)
# This should error:
with pytest.raises(StartupError) as ex:
await ds.invoke_startup()
assert "Duplicate permission {}".format(duplicate) in str(ex.value)
@pytest.mark.asyncio
async def test_hook_register_permissions_allows_identical_duplicates():
ds = Datasette(
metadata={
"plugins": {
"datasette-register-permissions": {
"permissions": [
{
"name": "name1",
"abbr": "abbr1",
"description": None,
"takes_database": False,
"takes_resource": False,
"default": True,
},
{
"name": "name1",
"abbr": "abbr1",
"description": None,
"takes_database": False,
"takes_resource": False,
"default": True,
},
]
}
}
},
plugins_dir=PLUGINS_DIR,
)
await ds.invoke_startup()
# Check that ds.permissions has only one of each
assert len([p for p in ds.permissions.values() if p.abbr == "abbr1"]) == 1