mirror of
https://github.com/simonw/datasette.git
synced 2025-12-10 16:51:24 +01:00
Remove ensure_permissions() and simplify check_visibility()
This commit removes the ensure_permissions() method entirely and updates all code to use direct allowed() checks instead. Key changes: - Removed ensure_permissions() method from datasette/app.py - Simplified check_visibility() to check single permissions directly - Replaced all ensure_permissions() calls with direct allowed() checks - Updated all check_visibility() calls to use only primary permission - Added Forbidden import to index.py Why this change: - ensure_permissions() used OR logic (any permission passes) which conflicted with explicit denies in the config - For example, check_visibility() called ensure_permissions() with ["view-database", "view-instance"] and if view-instance passed, it would show pages even with explicit database deny - The new approach checks only the specific permission needed for each resource, respecting explicit denies Test improvements: 64 failures → 41 failures 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
30e2f9064b
commit
6584c9e03f
7 changed files with 83 additions and 128 deletions
|
|
@ -1041,79 +1041,42 @@ class Datasette:
|
|||
for hook in pm.hook.track_event(datasette=self, event=event):
|
||||
await await_me_maybe(hook)
|
||||
|
||||
async def ensure_permissions(
|
||||
self,
|
||||
actor: dict,
|
||||
permissions: Sequence[Union[Tuple[str, Union[str, Tuple[str, str]]], str]],
|
||||
):
|
||||
"""
|
||||
permissions is a list of (action, resource) tuples or 'action' strings
|
||||
|
||||
Raises datasette.Forbidden() if any of the checks fail
|
||||
"""
|
||||
assert actor is None or isinstance(actor, dict), "actor must be None or a dict"
|
||||
for permission in permissions:
|
||||
if isinstance(permission, str):
|
||||
action = permission
|
||||
resource_obj = None
|
||||
elif isinstance(permission, (tuple, list)) and len(permission) == 2:
|
||||
action, resource = permission
|
||||
# Convert old-style resource to Resource object
|
||||
if isinstance(resource, str):
|
||||
resource_obj = DatabaseResource(database=resource)
|
||||
elif isinstance(resource, (tuple, list)) and len(resource) == 2:
|
||||
resource_obj = TableResource(database=resource[0], table=resource[1])
|
||||
else:
|
||||
resource_obj = None
|
||||
else:
|
||||
assert (
|
||||
False
|
||||
), "permission should be string or tuple of two items: {}".format(
|
||||
repr(permission)
|
||||
)
|
||||
ok = await self.allowed(
|
||||
action=action,
|
||||
resource=resource_obj,
|
||||
actor=actor,
|
||||
)
|
||||
if ok:
|
||||
return
|
||||
# If we got here, none of the permissions were granted
|
||||
# Raise Forbidden with the first action
|
||||
first_permission = permissions[0]
|
||||
if isinstance(first_permission, str):
|
||||
first_action = first_permission
|
||||
else:
|
||||
first_action = first_permission[0]
|
||||
raise Forbidden(first_action)
|
||||
|
||||
async def check_visibility(
|
||||
self,
|
||||
actor: dict,
|
||||
action: Optional[str] = None,
|
||||
action: str,
|
||||
resource: Optional[Union[str, Tuple[str, str]]] = None,
|
||||
permissions: Optional[
|
||||
Sequence[Union[Tuple[str, Union[str, Tuple[str, str]]], str]]
|
||||
] = None,
|
||||
):
|
||||
"""Returns (visible, private) - visible = can you see it, private = can others see it too"""
|
||||
if permissions:
|
||||
assert (
|
||||
not action and not resource
|
||||
), "Can't use action= or resource= with permissions="
|
||||
"""
|
||||
Check if actor can see a resource and if it's private.
|
||||
|
||||
Returns (visible, private) tuple:
|
||||
- visible: bool - can the actor see it?
|
||||
- private: bool - if visible, can anonymous users NOT see it?
|
||||
"""
|
||||
from datasette.resources import DatabaseResource, TableResource
|
||||
|
||||
# Convert old-style resource to Resource object
|
||||
if resource is None:
|
||||
resource_obj = None
|
||||
elif isinstance(resource, str):
|
||||
resource_obj = DatabaseResource(database=resource)
|
||||
elif isinstance(resource, tuple) and len(resource) == 2:
|
||||
resource_obj = TableResource(database=resource[0], table=resource[1])
|
||||
else:
|
||||
permissions = [(action, resource)]
|
||||
try:
|
||||
await self.ensure_permissions(actor, permissions)
|
||||
except Forbidden:
|
||||
resource_obj = None
|
||||
|
||||
# Check if actor can see it
|
||||
if not await self.allowed(action=action, resource=resource_obj, actor=actor):
|
||||
return False, False
|
||||
# User can see it, but can the anonymous user see it?
|
||||
try:
|
||||
await self.ensure_permissions(None, permissions)
|
||||
except Forbidden:
|
||||
# It's visible but private
|
||||
|
||||
# Check if anonymous user can see it (for "private" flag)
|
||||
if not await self.allowed(action=action, resource=resource_obj, actor=None):
|
||||
# Actor can see it but anonymous cannot - it's private
|
||||
return True, True
|
||||
# It's visible to everyone
|
||||
|
||||
# Both actor and anonymous can see it - it's public
|
||||
return True, False
|
||||
|
||||
async def allowed_resources_sql(
|
||||
|
|
|
|||
|
|
@ -50,10 +50,8 @@ class DatabaseView(View):
|
|||
|
||||
visible, private = await datasette.check_visibility(
|
||||
request.actor,
|
||||
permissions=[
|
||||
("view-database", database),
|
||||
"view-instance",
|
||||
],
|
||||
action="view-database",
|
||||
resource=database,
|
||||
)
|
||||
if not visible:
|
||||
raise Forbidden("You do not have permission to view this database")
|
||||
|
|
@ -96,11 +94,8 @@ class DatabaseView(View):
|
|||
).values():
|
||||
query_visible, query_private = await datasette.check_visibility(
|
||||
request.actor,
|
||||
permissions=[
|
||||
("view-query", (database, query["name"])),
|
||||
("view-database", database),
|
||||
"view-instance",
|
||||
],
|
||||
action="view-query",
|
||||
resource=(database, query["name"]),
|
||||
)
|
||||
if query_visible:
|
||||
canned_queries.append(dict(query, private=query_private))
|
||||
|
|
@ -370,15 +365,15 @@ async def get_tables(datasette, request, db, allowed_dict):
|
|||
|
||||
|
||||
async def database_download(request, datasette):
|
||||
from datasette.resources import DatabaseResource
|
||||
|
||||
database = tilde_decode(request.url_vars["database"])
|
||||
await datasette.ensure_permissions(
|
||||
request.actor,
|
||||
[
|
||||
("view-database-download", database),
|
||||
("view-database", database),
|
||||
"view-instance",
|
||||
],
|
||||
)
|
||||
if not await datasette.allowed(
|
||||
action="view-database-download",
|
||||
resource=DatabaseResource(database=database),
|
||||
actor=request.actor,
|
||||
):
|
||||
raise Forbidden("view-database-download")
|
||||
try:
|
||||
db = datasette.get_database(route=database)
|
||||
except KeyError:
|
||||
|
|
@ -540,19 +535,19 @@ class QueryView(View):
|
|||
# Respect canned query permissions
|
||||
visible, private = await datasette.check_visibility(
|
||||
request.actor,
|
||||
permissions=[
|
||||
("view-query", (database, canned_query["name"])),
|
||||
("view-database", database),
|
||||
"view-instance",
|
||||
],
|
||||
action="view-query",
|
||||
resource=(database, canned_query["name"]),
|
||||
)
|
||||
if not visible:
|
||||
raise Forbidden("You do not have permission to view this query")
|
||||
|
||||
else:
|
||||
await datasette.ensure_permissions(
|
||||
request.actor, [("execute-sql", database)]
|
||||
)
|
||||
if not await datasette.allowed(
|
||||
action="execute-sql",
|
||||
resource=DatabaseResource(database=database),
|
||||
actor=request.actor,
|
||||
):
|
||||
raise Forbidden("execute-sql")
|
||||
|
||||
# Flattened because of ?sql=&name1=value1&name2=value2 feature
|
||||
params = {key: request.args.get(key) for key in request.args}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
import json
|
||||
|
||||
from datasette import Forbidden
|
||||
from datasette.plugins import pm
|
||||
from datasette.utils import (
|
||||
add_cors_headers,
|
||||
|
|
@ -25,7 +26,8 @@ class IndexView(BaseView):
|
|||
|
||||
async def get(self, request):
|
||||
as_format = request.url_vars["format"]
|
||||
await self.ds.ensure_permissions(request.actor, ["view-instance"])
|
||||
if not await self.ds.allowed(action="view-instance", actor=request.actor):
|
||||
raise Forbidden("view-instance")
|
||||
|
||||
# Get all allowed databases and tables in bulk
|
||||
allowed_databases = await self.ds.allowed_resources(
|
||||
|
|
|
|||
|
|
@ -28,11 +28,8 @@ class RowView(DataView):
|
|||
# Ensure user has permission to view this row
|
||||
visible, private = await self.ds.check_visibility(
|
||||
request.actor,
|
||||
permissions=[
|
||||
("view-table", (database, table)),
|
||||
("view-database", database),
|
||||
"view-instance",
|
||||
],
|
||||
action="view-table",
|
||||
resource=(database, table),
|
||||
)
|
||||
if not visible:
|
||||
raise Forbidden("You do not have permission to view this table")
|
||||
|
|
|
|||
|
|
@ -44,7 +44,8 @@ class JsonDataView(BaseView):
|
|||
|
||||
async def get(self, request):
|
||||
if self.permission:
|
||||
await self.ds.ensure_permissions(request.actor, [self.permission])
|
||||
if not await self.ds.allowed(action=self.permission, actor=request.actor):
|
||||
raise Forbidden(self.permission)
|
||||
if self.needs_request:
|
||||
data = self.data_callback(request)
|
||||
else:
|
||||
|
|
@ -54,7 +55,8 @@ class JsonDataView(BaseView):
|
|||
|
||||
class PatternPortfolioView(View):
|
||||
async def get(self, request, datasette):
|
||||
await datasette.ensure_permissions(request.actor, ["view-instance"])
|
||||
if not await datasette.allowed(action="view-instance", actor=request.actor):
|
||||
raise Forbidden("view-instance")
|
||||
return Response.html(
|
||||
await datasette.render_template(
|
||||
"patterns.html",
|
||||
|
|
@ -112,7 +114,8 @@ class PermissionsDebugView(BaseView):
|
|||
has_json_alternate = False
|
||||
|
||||
async def get(self, request):
|
||||
await self.ds.ensure_permissions(request.actor, ["view-instance"])
|
||||
if not await self.ds.allowed(action="view-instance", actor=request.actor):
|
||||
raise Forbidden("view-instance")
|
||||
if not await self.ds.allowed(action="permissions-debug", actor=request.actor):
|
||||
raise Forbidden("Permission denied")
|
||||
filter_ = request.args.get("filter") or "all"
|
||||
|
|
@ -151,7 +154,8 @@ class PermissionsDebugView(BaseView):
|
|||
)
|
||||
|
||||
async def post(self, request):
|
||||
await self.ds.ensure_permissions(request.actor, ["view-instance"])
|
||||
if not await self.ds.allowed(action="view-instance", actor=request.actor):
|
||||
raise Forbidden("view-instance")
|
||||
if not await self.ds.allowed(action="permissions-debug", actor=request.actor):
|
||||
raise Forbidden("Permission denied")
|
||||
vars = await request.post_vars()
|
||||
|
|
@ -362,7 +366,8 @@ class PermissionRulesView(BaseView):
|
|||
has_json_alternate = False
|
||||
|
||||
async def get(self, request):
|
||||
await self.ds.ensure_permissions(request.actor, ["view-instance"])
|
||||
if not await self.ds.allowed(action="view-instance", actor=request.actor):
|
||||
raise Forbidden("view-instance")
|
||||
if not await self.ds.allowed(action="permissions-debug", actor=request.actor):
|
||||
raise Forbidden("Permission denied")
|
||||
|
||||
|
|
@ -607,11 +612,13 @@ class MessagesDebugView(BaseView):
|
|||
has_json_alternate = False
|
||||
|
||||
async def get(self, request):
|
||||
await self.ds.ensure_permissions(request.actor, ["view-instance"])
|
||||
if not await self.ds.allowed(action="view-instance", actor=request.actor):
|
||||
raise Forbidden("view-instance")
|
||||
return await self.render(["messages_debug.html"], request)
|
||||
|
||||
async def post(self, request):
|
||||
await self.ds.ensure_permissions(request.actor, ["view-instance"])
|
||||
if not await self.ds.allowed(action="view-instance", actor=request.actor):
|
||||
raise Forbidden("view-instance")
|
||||
post = await request.post_vars()
|
||||
message = post.get("message", "")
|
||||
message_type = post.get("message_type") or "INFO"
|
||||
|
|
@ -774,7 +781,7 @@ class ApiExplorerView(BaseView):
|
|||
if name == "_internal":
|
||||
continue
|
||||
database_visible, _ = await self.ds.check_visibility(
|
||||
request.actor, permissions=[("view-database", name), "view-instance"]
|
||||
request.actor, action="view-database", resource=name
|
||||
)
|
||||
if not database_visible:
|
||||
continue
|
||||
|
|
@ -783,11 +790,8 @@ class ApiExplorerView(BaseView):
|
|||
for table in table_names:
|
||||
visible, _ = await self.ds.check_visibility(
|
||||
request.actor,
|
||||
permissions=[
|
||||
("view-table", (name, table)),
|
||||
("view-database", name),
|
||||
"view-instance",
|
||||
],
|
||||
action="view-table",
|
||||
resource=(name, table),
|
||||
)
|
||||
if not visible:
|
||||
continue
|
||||
|
|
@ -886,7 +890,7 @@ class ApiExplorerView(BaseView):
|
|||
async def get(self, request):
|
||||
visible, private = await self.ds.check_visibility(
|
||||
request.actor,
|
||||
permissions=["view-instance"],
|
||||
action="view-instance",
|
||||
)
|
||||
if not visible:
|
||||
raise Forbidden("You do not have permission to view this instance")
|
||||
|
|
|
|||
|
|
@ -963,11 +963,8 @@ async def table_view_data(
|
|||
# Can this user view it?
|
||||
visible, private = await datasette.check_visibility(
|
||||
request.actor,
|
||||
permissions=[
|
||||
("view-table", (database_name, table_name)),
|
||||
("view-database", database_name),
|
||||
"view-instance",
|
||||
],
|
||||
action="view-table",
|
||||
resource=(database_name, table_name),
|
||||
)
|
||||
if not visible:
|
||||
raise Forbidden("You do not have permission to view this table")
|
||||
|
|
|
|||
|
|
@ -85,21 +85,23 @@ ALLOW_ROOT = {"allow": {"id": "root"}}
|
|||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"actor,config,permissions,should_allow,expected_private",
|
||||
"actor,config,action,resource,should_allow,expected_private",
|
||||
(
|
||||
(None, ALLOW_ROOT, ["view-instance"], False, False),
|
||||
(ROOT, ALLOW_ROOT, ["view-instance"], True, True),
|
||||
(None, ALLOW_ROOT, "view-instance", None, False, False),
|
||||
(ROOT, ALLOW_ROOT, "view-instance", None, True, True),
|
||||
(
|
||||
None,
|
||||
{"databases": {"_memory": ALLOW_ROOT}},
|
||||
[("view-database", "_memory")],
|
||||
"view-database",
|
||||
"_memory",
|
||||
False,
|
||||
False,
|
||||
),
|
||||
(
|
||||
ROOT,
|
||||
{"databases": {"_memory": ALLOW_ROOT}},
|
||||
[("view-database", "_memory")],
|
||||
"view-database",
|
||||
"_memory",
|
||||
True,
|
||||
True,
|
||||
),
|
||||
|
|
@ -107,24 +109,19 @@ ALLOW_ROOT = {"allow": {"id": "root"}}
|
|||
(
|
||||
ROOT,
|
||||
{"allow": True},
|
||||
["view-instance"],
|
||||
"view-instance",
|
||||
None,
|
||||
True,
|
||||
False,
|
||||
),
|
||||
),
|
||||
)
|
||||
async def test_datasette_ensure_permissions_check_visibility(
|
||||
actor, config, permissions, should_allow, expected_private
|
||||
async def test_datasette_check_visibility(
|
||||
actor, config, action, resource, should_allow, expected_private
|
||||
):
|
||||
ds = Datasette([], memory=True, config=config)
|
||||
await ds.invoke_startup()
|
||||
if not should_allow:
|
||||
with pytest.raises(Forbidden):
|
||||
await ds.ensure_permissions(actor, permissions)
|
||||
else:
|
||||
await ds.ensure_permissions(actor, permissions)
|
||||
# And try check_visibility too:
|
||||
visible, private = await ds.check_visibility(actor, permissions=permissions)
|
||||
visible, private = await ds.check_visibility(actor, action=action, resource=resource)
|
||||
assert visible == should_allow
|
||||
assert private == expected_private
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue