BaseView.check_permissions is now datasette.ensure_permissions, closes #1675

Refs #1660
This commit is contained in:
Simon Willison 2022-03-21 10:13:16 -07:00
commit e627510b76
5 changed files with 71 additions and 36 deletions

View file

@ -1,4 +1,5 @@
import asyncio
from typing import Sequence, Union, Tuple
import asgi_csrf
import collections
import datetime
@ -628,6 +629,40 @@ class Datasette:
)
return result
async def ensure_permissions(
self,
actor: dict,
permissions: Sequence[Union[Tuple[str, Union[str, Tuple[str, str]]], str]],
):
"""
permissions is a list of (action, resource) tuples or 'action' strings
Raises datasette.Forbidden() if any of the checks fail
"""
for permission in permissions:
if isinstance(permission, str):
action = permission
resource = None
elif isinstance(permission, (tuple, list)) and len(permission) == 2:
action, resource = permission
else:
assert (
False
), "permission should be string or tuple of two items: {}".format(
repr(permission)
)
ok = await self.permission_allowed(
actor,
action,
resource=resource,
default=None,
)
if ok is not None:
if ok:
return
else:
raise Forbidden(action)
async def execute(
self,
db_name,

View file

@ -76,32 +76,6 @@ class BaseView:
if not ok:
raise Forbidden(action)
async def check_permissions(self, request, permissions):
"""permissions is a list of (action, resource) tuples or 'action' strings"""
for permission in permissions:
if isinstance(permission, str):
action = permission
resource = None
elif isinstance(permission, (tuple, list)) and len(permission) == 2:
action, resource = permission
else:
assert (
False
), "permission should be string or tuple of two items: {}".format(
repr(permission)
)
ok = await self.ds.permission_allowed(
request.actor,
action,
resource=resource,
default=None,
)
if ok is not None:
if ok:
return
else:
raise Forbidden(action)
def database_color(self, database):
return "ff0000"

View file

@ -39,8 +39,8 @@ class DatabaseView(DataView):
raise NotFound("Database not found: {}".format(database_route))
database = db.name
await self.check_permissions(
request,
await self.ds.ensure_permissions(
request.actor,
[
("view-database", database),
"view-instance",
@ -164,8 +164,8 @@ class DatabaseDownload(DataView):
async def get(self, request):
database = tilde_decode(request.url_vars["database"])
await self.check_permissions(
request,
await self.ds.ensure_permissions(
request.actor,
[
("view-database-download", database),
("view-database", database),
@ -217,8 +217,8 @@ class QueryView(DataView):
private = False
if canned_query:
# Respect canned query permissions
await self.check_permissions(
request,
await self.ds.ensure_permissions(
request.actor,
[
("view-query", (database, canned_query)),
("view-database", database),

View file

@ -360,8 +360,8 @@ class TableView(RowTableShared):
raise NotFound(f"Table not found: {table}")
# Ensure user has permission to view this table
await self.check_permissions(
request,
await self.ds.ensure_permissions(
request.actor,
[
("view-table", (database, table)),
("view-database", database),
@ -950,8 +950,8 @@ class RowView(RowTableShared):
except KeyError:
raise NotFound("Database not found: {}".format(database_route))
database = db.name
await self.check_permissions(
request,
await self.ds.ensure_permissions(
request.actor,
[
("view-table", (database, table)),
("view-database", database),