datasette/tests/test_permissions.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

953 lines
32 KiB
Python
Raw Permalink Normal View History

import collections
from datasette.app import Datasette
from datasette.cli import cli
from .fixtures import app_client, assert_permissions_checked, make_app_client
from click.testing import CliRunner
from bs4 import BeautifulSoup as Soup
import copy
import json
from pprint import pprint
import pytest_asyncio
import pytest
import re
import time
2020-07-24 15:54:41 -07:00
import urllib
@pytest.fixture(scope="module")
def padlock_client():
with make_app_client(
metadata={
"databases": {
"fixtures": {
"queries": {"two": {"sql": "select 1 + 1"}},
}
}
}
) as client:
yield client
@pytest_asyncio.fixture
async def perms_ds():
ds = Datasette()
await ds.invoke_startup()
one = ds.add_memory_database("perms_ds_one")
two = ds.add_memory_database("perms_ds_two")
await one.execute_write("create table if not exists t1 (id integer primary key)")
await one.execute_write("create table if not exists t2 (id integer primary key)")
await two.execute_write("create table if not exists t1 (id integer primary key)")
return ds
@pytest.mark.parametrize(
"allow,expected_anon,expected_auth",
2020-09-02 15:24:55 -07:00
[
(None, 200, 200),
({}, 403, 403),
({"id": "root"}, 403, 200),
],
)
@pytest.mark.parametrize(
"path",
(
"/",
"/fixtures",
"/fixtures/compound_three_primary_keys",
"/fixtures/compound_three_primary_keys/a,a,a",
"/fixtures/two", # Query
),
)
def test_view_padlock(allow, expected_anon, expected_auth, path, padlock_client):
padlock_client.ds._metadata_local["allow"] = allow
fragment = "🔒</h1>"
anon_response = padlock_client.get(path)
assert expected_anon == anon_response.status
if allow and anon_response.status == 200:
# Should be no padlock
assert fragment not in anon_response.text
auth_response = padlock_client.get(
path,
cookies={"ds_actor": padlock_client.actor_cookie({"id": "root"})},
)
assert expected_auth == auth_response.status
# Check for the padlock
if allow and expected_anon == 403 and expected_auth == 200:
assert fragment in auth_response.text
del padlock_client.ds._metadata_local["allow"]
@pytest.mark.parametrize(
"allow,expected_anon,expected_auth",
2020-09-02 15:24:55 -07:00
[
(None, 200, 200),
({}, 403, 403),
({"id": "root"}, 403, 200),
],
)
def test_view_database(allow, expected_anon, expected_auth):
with make_app_client(
metadata={"databases": {"fixtures": {"allow": allow}}}
) as client:
for path in (
"/fixtures",
"/fixtures/compound_three_primary_keys",
"/fixtures/compound_three_primary_keys/a,a,a",
):
anon_response = client.get(path)
assert expected_anon == anon_response.status, path
if allow and path == "/fixtures" and anon_response.status == 200:
# Should be no padlock
assert ">fixtures 🔒</h1>" not in anon_response.text
auth_response = client.get(
2020-09-02 15:24:55 -07:00
path,
cookies={"ds_actor": client.actor_cookie({"id": "root"})},
)
assert expected_auth == auth_response.status
if (
allow
and path == "/fixtures"
and expected_anon == 403
and expected_auth == 200
):
assert ">fixtures 🔒</h1>" in auth_response.text
def test_database_list_respects_view_database():
with make_app_client(
metadata={"databases": {"fixtures": {"allow": {"id": "root"}}}},
extra_databases={"data.db": "create table names (name text)"},
) as client:
anon_response = client.get("/")
assert '<a href="/data">data</a></h2>' in anon_response.text
assert '<a href="/fixtures">fixtures</a>' not in anon_response.text
auth_response = client.get(
2020-09-02 15:24:55 -07:00
"/",
cookies={"ds_actor": client.actor_cookie({"id": "root"})},
)
assert '<a href="/data">data</a></h2>' in auth_response.text
assert '<a href="/fixtures">fixtures</a> 🔒</h2>' in auth_response.text
2020-06-07 21:47:22 -07:00
def test_database_list_respects_view_table():
with make_app_client(
metadata={
"databases": {
"data": {
"tables": {
"names": {"allow": {"id": "root"}},
"v": {"allow": {"id": "root"}},
}
}
}
},
extra_databases={
"data.db": "create table names (name text); create view v as select * from names"
},
) as client:
html_fragments = [
">names</a> 🔒",
">v</a> 🔒",
]
anon_response_text = client.get("/").text
assert "0 rows in 0 tables" in anon_response_text
for html_fragment in html_fragments:
assert html_fragment not in anon_response_text
auth_response_text = client.get(
2020-09-02 15:24:55 -07:00
"/",
cookies={"ds_actor": client.actor_cookie({"id": "root"})},
).text
for html_fragment in html_fragments:
assert html_fragment in auth_response_text
2020-06-07 21:47:22 -07:00
@pytest.mark.parametrize(
"allow,expected_anon,expected_auth",
2020-09-02 15:24:55 -07:00
[
(None, 200, 200),
({}, 403, 403),
({"id": "root"}, 403, 200),
],
2020-06-07 21:47:22 -07:00
)
def test_view_table(allow, expected_anon, expected_auth):
with make_app_client(
metadata={
"databases": {
"fixtures": {
"tables": {"compound_three_primary_keys": {"allow": allow}}
}
}
}
) as client:
anon_response = client.get("/fixtures/compound_three_primary_keys")
assert expected_anon == anon_response.status
if allow and anon_response.status == 200:
# Should be no padlock
assert ">compound_three_primary_keys 🔒</h1>" not in anon_response.text
2020-06-07 21:47:22 -07:00
auth_response = client.get(
"/fixtures/compound_three_primary_keys",
cookies={"ds_actor": client.actor_cookie({"id": "root"})},
2020-06-07 21:47:22 -07:00
)
assert expected_auth == auth_response.status
if allow and expected_anon == 403 and expected_auth == 200:
assert ">compound_three_primary_keys 🔒</h1>" in auth_response.text
2020-06-07 21:47:22 -07:00
def test_table_list_respects_view_table():
with make_app_client(
metadata={
"databases": {
"fixtures": {
"tables": {
"compound_three_primary_keys": {"allow": {"id": "root"}},
# And a SQL view too:
"paginated_view": {"allow": {"id": "root"}},
}
2020-06-07 21:47:22 -07:00
}
}
}
) as client:
html_fragments = [
">compound_three_primary_keys</a> 🔒",
">paginated_view</a> 🔒",
]
2020-06-07 21:47:22 -07:00
anon_response = client.get("/fixtures")
for html_fragment in html_fragments:
assert html_fragment not in anon_response.text
2020-06-07 21:47:22 -07:00
auth_response = client.get(
"/fixtures", cookies={"ds_actor": client.actor_cookie({"id": "root"})}
2020-06-07 21:47:22 -07:00
)
for html_fragment in html_fragments:
assert html_fragment in auth_response.text
2020-06-07 21:47:22 -07:00
@pytest.mark.parametrize(
"allow,expected_anon,expected_auth",
2020-09-02 15:24:55 -07:00
[
(None, 200, 200),
({}, 403, 403),
({"id": "root"}, 403, 200),
],
2020-06-07 21:47:22 -07:00
)
def test_view_query(allow, expected_anon, expected_auth):
with make_app_client(
metadata={
"databases": {
"fixtures": {"queries": {"q": {"sql": "select 1 + 1", "allow": allow}}}
}
}
) as client:
anon_response = client.get("/fixtures/q")
assert expected_anon == anon_response.status
if allow and anon_response.status == 200:
# Should be no padlock
assert "🔒</h1>" not in anon_response.text
2020-06-07 21:47:22 -07:00
auth_response = client.get(
"/fixtures/q", cookies={"ds_actor": client.actor_cookie({"id": "root"})}
2020-06-07 21:47:22 -07:00
)
assert expected_auth == auth_response.status
if allow and expected_anon == 403 and expected_auth == 200:
assert ">fixtures: q 🔒</h1>" in auth_response.text
2020-06-07 21:47:22 -07:00
@pytest.mark.parametrize(
"metadata",
[
{"allow_sql": {"id": "root"}},
{"databases": {"fixtures": {"allow_sql": {"id": "root"}}}},
],
)
def test_execute_sql(metadata):
schema_re = re.compile("const schema = ({.*?});", re.DOTALL)
with make_app_client(metadata=metadata) as client:
form_fragment = '<form class="sql" action="/fixtures"'
# Anonymous users - should not display the form:
anon_html = client.get("/fixtures").text
assert form_fragment not in anon_html
# And const schema should be an empty object:
assert "const schema = {};" in anon_html
# This should 403:
assert client.get("/fixtures?sql=select+1").status == 403
# ?_where= not allowed on tables:
assert client.get("/fixtures/facet_cities?_where=id=3").status == 403
# But for logged in user all of these should work:
cookies = {"ds_actor": client.actor_cookie({"id": "root"})}
response_text = client.get("/fixtures", cookies=cookies).text
# Extract the schema= portion of the JavaScript
schema_json = schema_re.search(response_text).group(1)
schema = json.loads(schema_json)
assert set(schema["attraction_characteristic"]) == {"name", "pk"}
assert schema["paginated_view"] == []
assert form_fragment in response_text
query_response = client.get("/fixtures?sql=select+1", cookies=cookies)
assert query_response.status == 200
schema2 = json.loads(schema_re.search(query_response.text).group(1))
assert set(schema2["attraction_characteristic"]) == {"name", "pk"}
assert (
client.get("/fixtures/facet_cities?_where=id=3", cookies=cookies).status
== 200
)
2020-06-07 21:47:22 -07:00
def test_query_list_respects_view_query():
with make_app_client(
metadata={
"databases": {
"fixtures": {
"queries": {"q": {"sql": "select 1 + 1", "allow": {"id": "root"}}}
}
}
}
) as client:
html_fragment = '<li><a href="/fixtures/q" title="select 1 + 1">q</a> 🔒</li>'
anon_response = client.get("/fixtures")
assert html_fragment not in anon_response.text
assert '"/fixtures/q"' not in anon_response.text
auth_response = client.get(
"/fixtures", cookies={"ds_actor": client.actor_cookie({"id": "root"})}
2020-06-07 21:47:22 -07:00
)
assert html_fragment in auth_response.text
@pytest.mark.parametrize(
"path,permissions",
[
("/", ["view-instance"]),
2020-06-08 12:02:56 -07:00
("/fixtures", ["view-instance", ("view-database", "fixtures")]),
(
"/fixtures/facetable/1",
2020-06-08 12:02:56 -07:00
["view-instance", ("view-table", ("fixtures", "facetable"))],
),
(
"/fixtures/simple_primary_key",
[
"view-instance",
2020-06-08 12:02:56 -07:00
("view-database", "fixtures"),
("view-table", ("fixtures", "simple_primary_key")),
],
),
(
"/fixtures?sql=select+1",
[
"view-instance",
2020-06-08 12:02:56 -07:00
("view-database", "fixtures"),
("execute-sql", "fixtures"),
],
),
(
"/fixtures.db",
[
"view-instance",
2020-06-08 12:02:56 -07:00
("view-database", "fixtures"),
("view-database-download", "fixtures"),
],
),
(
"/fixtures/neighborhood_search",
[
"view-instance",
2020-06-08 12:02:56 -07:00
("view-database", "fixtures"),
("view-query", ("fixtures", "neighborhood_search")),
],
),
],
)
def test_permissions_checked(app_client, path, permissions):
# Needs file-backed app_client for /fixtures.db
app_client.ds._permission_checks.clear()
response = app_client.get(path)
assert response.status_code in (200, 403)
assert_permissions_checked(app_client.ds, permissions)
@pytest.mark.asyncio
async def test_permissions_debug(ds_client):
ds_client.ds._permission_checks.clear()
assert (await ds_client.get("/-/permissions")).status_code == 403
# With the cookie it should work
cookie = ds_client.actor_cookie({"id": "root"})
response = await ds_client.get("/-/permissions", cookies={"ds_actor": cookie})
assert response.status_code == 200
# Should show one failure and one success
soup = Soup(response.text, "html.parser")
check_divs = soup.findAll("div", {"class": "check"})
checks = [
{
"action": div.select_one(".check-action").text,
# True = green tick, False = red cross, None = gray None
"result": None
if div.select(".check-result-no-opinion")
else bool(div.select(".check-result-true")),
"used_default": bool(div.select(".check-used-default")),
}
for div in check_divs
]
assert checks == [
{"action": "permissions-debug", "result": True, "used_default": False},
{"action": "view-instance", "result": None, "used_default": True},
{"action": "debug-menu", "result": False, "used_default": True},
{"action": "view-instance", "result": True, "used_default": True},
{"action": "permissions-debug", "result": False, "used_default": True},
{"action": "view-instance", "result": None, "used_default": True},
]
@pytest.mark.asyncio
2020-07-24 15:54:41 -07:00
@pytest.mark.parametrize(
"actor,allow,expected_fragment",
[
('{"id":"root"}', "{}", "Result: deny"),
('{"id":"root"}', '{"id": "*"}', "Result: allow"),
('{"', '{"id": "*"}', "Actor JSON error"),
('{"id":"root"}', '"*"}', "Allow JSON error"),
],
)
async def test_allow_debug(ds_client, actor, allow, expected_fragment):
2022-12-16 09:49:26 -08:00
response = await ds_client.get(
2020-07-24 15:54:41 -07:00
"/-/allow-debug?" + urllib.parse.urlencode({"actor": actor, "allow": allow})
)
assert response.status_code == 200
2020-07-24 15:54:41 -07:00
assert expected_fragment in response.text
2020-06-09 12:25:44 -07:00
@pytest.mark.parametrize(
"allow,expected",
2020-09-02 15:24:55 -07:00
[
({"id": "root"}, 403),
({"id": "root", "unauthenticated": True}, 200),
],
2020-06-09 12:25:44 -07:00
)
def test_allow_unauthenticated(allow, expected):
2020-06-09 12:25:44 -07:00
with make_app_client(metadata={"allow": allow}) as client:
assert expected == client.get("/").status
@pytest.fixture(scope="session")
def view_instance_client():
with make_app_client(metadata={"allow": {}}) as client:
yield client
@pytest.mark.parametrize(
"path",
[
"/",
"/fixtures",
"/fixtures/facetable",
"/-/metadata",
"/-/versions",
"/-/plugins",
"/-/settings",
"/-/threads",
"/-/databases",
"/-/permissions",
"/-/messages",
"/-/patterns",
],
)
def test_view_instance(path, view_instance_client):
assert 403 == view_instance_client.get(path).status
if path not in ("/-/permissions", "/-/messages", "/-/patterns"):
assert 403 == view_instance_client.get(path + ".json").status
@pytest.fixture(scope="session")
def cascade_app_client():
with make_app_client(is_immutable=True) as client:
yield client
@pytest.mark.parametrize(
"path,permissions,expected_status",
[
("/", [], 403),
("/", ["instance"], 200),
# Can view table even if not allowed database or instance
("/fixtures/binary_data", [], 403),
("/fixtures/binary_data", ["database"], 403),
("/fixtures/binary_data", ["instance"], 403),
("/fixtures/binary_data", ["table"], 200),
("/fixtures/binary_data", ["table", "database"], 200),
("/fixtures/binary_data", ["table", "database", "instance"], 200),
# ... same for row
("/fixtures/binary_data/1", [], 403),
("/fixtures/binary_data/1", ["database"], 403),
("/fixtures/binary_data/1", ["instance"], 403),
("/fixtures/binary_data/1", ["table"], 200),
("/fixtures/binary_data/1", ["table", "database"], 200),
("/fixtures/binary_data/1", ["table", "database", "instance"], 200),
# Can view query even if not allowed database or instance
("/fixtures/magic_parameters", [], 403),
("/fixtures/magic_parameters", ["database"], 403),
("/fixtures/magic_parameters", ["instance"], 403),
("/fixtures/magic_parameters", ["query"], 200),
("/fixtures/magic_parameters", ["query", "database"], 200),
("/fixtures/magic_parameters", ["query", "database", "instance"], 200),
# Can view database even if not allowed instance
("/fixtures", [], 403),
("/fixtures", ["instance"], 403),
("/fixtures", ["database"], 200),
# Downloading the fixtures.db file
("/fixtures.db", [], 403),
("/fixtures.db", ["instance"], 403),
("/fixtures.db", ["database"], 200),
("/fixtures.db", ["download"], 200),
],
)
def test_permissions_cascade(cascade_app_client, path, permissions, expected_status):
"""Test that e.g. having view-table but NOT view-database lets you view table page, etc"""
allow = {"id": "*"}
deny = {}
previous_metadata = cascade_app_client.ds.metadata()
updated_metadata = copy.deepcopy(previous_metadata)
actor = {"id": "test"}
if "download" in permissions:
actor["can_download"] = 1
try:
# Set up the different allow blocks
updated_metadata["allow"] = allow if "instance" in permissions else deny
updated_metadata["databases"]["fixtures"]["allow"] = (
allow if "database" in permissions else deny
)
updated_metadata["databases"]["fixtures"]["tables"]["binary_data"] = {
"allow": (allow if "table" in permissions else deny)
}
updated_metadata["databases"]["fixtures"]["queries"]["magic_parameters"][
"allow"
] = (allow if "query" in permissions else deny)
cascade_app_client.ds._metadata_local = updated_metadata
response = cascade_app_client.get(
2020-09-02 15:24:55 -07:00
path,
cookies={"ds_actor": cascade_app_client.actor_cookie(actor)},
)
assert (
response.status == expected_status
), "path: {}, permissions: {}, expected_status: {}, status: {}".format(
path, permissions, expected_status, response.status
)
finally:
cascade_app_client.ds._metadata_local = previous_metadata
def test_padlocks_on_database_page(cascade_app_client):
metadata = {
"databases": {
"fixtures": {
"allow": {"id": "test"},
"tables": {
"123_starts_with_digits": {"allow": True},
"simple_view": {"allow": True},
},
"queries": {"query_two": {"allow": True, "sql": "select 2"}},
}
}
}
previous_metadata = cascade_app_client.ds._metadata_local
try:
cascade_app_client.ds._metadata_local = metadata
response = cascade_app_client.get(
"/fixtures",
cookies={"ds_actor": cascade_app_client.actor_cookie({"id": "test"})},
)
# Tables
assert ">123_starts_with_digits</a></h3>" in response.text
assert ">Table With Space In Name</a> 🔒</h3>" in response.text
# Queries
assert ">from_async_hook</a> 🔒</li>" in response.text
assert ">query_two</a></li>" in response.text
# Views
assert ">paginated_view</a> 🔒</li>" in response.text
assert ">simple_view</a></li>" in response.text
finally:
cascade_app_client.ds._metadata_local = previous_metadata
DEF = "USE_DEFAULT"
@pytest.mark.asyncio
@pytest.mark.parametrize(
"actor,permission,resource_1,resource_2,expected_result",
(
# Without restrictions the defaults apply
({"id": "t"}, "view-instance", None, None, DEF),
({"id": "t"}, "view-database", "one", None, DEF),
({"id": "t"}, "view-table", "one", "t1", DEF),
# If there is an _r block, everything gets denied unless explicitly allowed
({"id": "t", "_r": {}}, "view-instance", None, None, False),
({"id": "t", "_r": {}}, "view-database", "one", None, False),
({"id": "t", "_r": {}}, "view-table", "one", "t1", False),
# Explicit allowing works at the "a" for all level:
({"id": "t", "_r": {"a": ["vi"]}}, "view-instance", None, None, DEF),
({"id": "t", "_r": {"a": ["vd"]}}, "view-database", "one", None, DEF),
({"id": "t", "_r": {"a": ["vt"]}}, "view-table", "one", "t1", DEF),
# But not if it's the wrong permission
({"id": "t", "_r": {"a": ["vd"]}}, "view-instance", None, None, False),
({"id": "t", "_r": {"a": ["vi"]}}, "view-database", "one", None, False),
({"id": "t", "_r": {"a": ["vd"]}}, "view-table", "one", "t1", False),
# Works at the "d" for database level:
({"id": "t", "_r": {"d": {"one": ["vd"]}}}, "view-database", "one", None, DEF),
(
{"id": "t", "_r": {"d": {"one": ["vdd"]}}},
"view-database-download",
"one",
None,
DEF,
),
({"id": "t", "_r": {"d": {"one": ["es"]}}}, "execute-sql", "one", None, DEF),
# Works at the "r" for table level:
(
{"id": "t", "_r": {"r": {"one": {"t1": ["vt"]}}}},
"view-table",
"one",
"t1",
DEF,
),
(
{"id": "t", "_r": {"r": {"one": {"t1": ["vt"]}}}},
"view-table",
"one",
"t2",
False,
),
# non-abbreviations should work too
({"id": "t", "_r": {"a": ["view-instance"]}}, "view-instance", None, None, DEF),
(
{"id": "t", "_r": {"d": {"one": ["view-database"]}}},
"view-database",
"one",
None,
DEF,
),
(
{"id": "t", "_r": {"r": {"one": {"t1": ["view-table"]}}}},
"view-table",
"one",
"t1",
DEF,
),
),
)
async def test_actor_restricted_permissions(
perms_ds, actor, permission, resource_1, resource_2, expected_result
):
cookies = {"ds_actor": perms_ds.sign({"a": {"id": "root"}}, "actor")}
csrftoken = (await perms_ds.client.get("/-/permissions", cookies=cookies)).cookies[
"ds_csrftoken"
]
cookies["ds_csrftoken"] = csrftoken
response = await perms_ds.client.post(
"/-/permissions",
data={
"actor": json.dumps(actor),
"permission": permission,
"resource_1": resource_1,
"resource_2": resource_2,
"csrftoken": csrftoken,
},
cookies=cookies,
)
expected_resource = []
if resource_1:
expected_resource.append(resource_1)
if resource_2:
expected_resource.append(resource_2)
if len(expected_resource) == 1:
expected_resource = expected_resource[0]
expected = {
"actor": actor,
"permission": permission,
"resource": expected_resource,
"result": expected_result,
}
assert response.json() == expected
PermMetadataTestCase = collections.namedtuple(
"PermMetadataTestCase",
"metadata,actor,action,resource,expected_result",
)
@pytest.mark.asyncio
@pytest.mark.parametrize(
"metadata,actor,action,resource,expected_result",
(
# Simple view-instance default=True example
PermMetadataTestCase(
metadata={},
actor=None,
action="view-instance",
resource=None,
expected_result=True,
),
# debug-menu on root
PermMetadataTestCase(
metadata={"permissions": {"debug-menu": {"id": "user"}}},
actor={"id": "user"},
action="debug-menu",
resource=None,
expected_result=True,
),
# debug-menu on root, wrong actor
PermMetadataTestCase(
metadata={"permissions": {"debug-menu": {"id": "user"}}},
actor={"id": "user2"},
action="debug-menu",
resource=None,
expected_result=False,
),
# create-table on root
PermMetadataTestCase(
metadata={"permissions": {"create-table": {"id": "user"}}},
actor={"id": "user"},
action="create-table",
resource=None,
expected_result=True,
),
# create-table on database - no resource specified
PermMetadataTestCase(
metadata={
2022-12-12 21:00:40 -08:00
"databases": {
"perms_ds_one": {"permissions": {"create-table": {"id": "user"}}}
}
},
actor={"id": "user"},
action="create-table",
resource=None,
expected_result=False,
),
# create-table on database
PermMetadataTestCase(
metadata={
2022-12-12 21:00:40 -08:00
"databases": {
"perms_ds_one": {"permissions": {"create-table": {"id": "user"}}}
}
},
actor={"id": "user"},
action="create-table",
2022-12-12 21:00:40 -08:00
resource="perms_ds_one",
expected_result=True,
),
# insert-row on root, wrong actor
PermMetadataTestCase(
metadata={"permissions": {"insert-row": {"id": "user"}}},
actor={"id": "user2"},
action="insert-row",
2022-12-12 21:00:40 -08:00
resource=("perms_ds_one", "t1"),
expected_result=False,
),
# insert-row on root, right actor
PermMetadataTestCase(
metadata={"permissions": {"insert-row": {"id": "user"}}},
actor={"id": "user"},
action="insert-row",
2022-12-12 21:00:40 -08:00
resource=("perms_ds_one", "t1"),
expected_result=True,
),
# insert-row on database
PermMetadataTestCase(
metadata={
2022-12-12 21:00:40 -08:00
"databases": {
"perms_ds_one": {"permissions": {"insert-row": {"id": "user"}}}
}
},
actor={"id": "user"},
action="insert-row",
2022-12-12 21:00:40 -08:00
resource="perms_ds_one",
expected_result=True,
),
# insert-row on table, wrong table
PermMetadataTestCase(
metadata={
"databases": {
2022-12-12 21:00:40 -08:00
"perms_ds_one": {
"tables": {
"t1": {"permissions": {"insert-row": {"id": "user"}}}
}
}
}
},
actor={"id": "user"},
action="insert-row",
2022-12-12 21:00:40 -08:00
resource=("perms_ds_one", "t2"),
expected_result=False,
),
# insert-row on table, right table
PermMetadataTestCase(
metadata={
"databases": {
2022-12-12 21:00:40 -08:00
"perms_ds_one": {
"tables": {
"t1": {"permissions": {"insert-row": {"id": "user"}}}
}
}
}
},
actor={"id": "user"},
action="insert-row",
2022-12-12 21:00:40 -08:00
resource=("perms_ds_one", "t1"),
expected_result=True,
),
# view-query on canned query, wrong actor
PermMetadataTestCase(
metadata={
"databases": {
2022-12-12 21:00:40 -08:00
"perms_ds_one": {
"queries": {
"q1": {
"sql": "select 1 + 1",
"permissions": {"view-query": {"id": "user"}},
}
}
}
}
},
actor={"id": "user2"},
action="view-query",
2022-12-12 21:00:40 -08:00
resource=("perms_ds_one", "q1"),
expected_result=False,
),
# view-query on canned query, right actor
PermMetadataTestCase(
metadata={
"databases": {
2022-12-12 21:00:40 -08:00
"perms_ds_one": {
"queries": {
"q1": {
"sql": "select 1 + 1",
"permissions": {"view-query": {"id": "user"}},
}
}
}
}
},
actor={"id": "user"},
action="view-query",
2022-12-12 21:00:40 -08:00
resource=("perms_ds_one", "q1"),
expected_result=True,
),
),
)
async def test_permissions_in_metadata(
perms_ds, metadata, actor, action, resource, expected_result
):
previous_metadata = perms_ds.metadata()
updated_metadata = copy.deepcopy(previous_metadata)
updated_metadata.update(metadata)
perms_ds._metadata_local = updated_metadata
try:
result = await perms_ds.permission_allowed(actor, action, resource)
if result != expected_result:
pprint(perms_ds._permission_checks)
assert result == expected_result
finally:
perms_ds._metadata_local = previous_metadata
@pytest.mark.asyncio
async def test_actor_endpoint_allows_any_token():
ds = Datasette()
token = ds.sign(
{
"a": "root",
"token": "dstok",
"t": int(time.time()),
"_r": {"a": ["debug-menu"]},
},
namespace="token",
)
response = await ds.client.get(
"/-/actor.json", headers={"Authorization": f"Bearer dstok_{token}"}
)
assert response.status_code == 200
assert response.json()["actor"] == {
"id": "root",
"token": "dstok",
"_r": {"a": ["debug-menu"]},
}
@pytest.mark.parametrize(
"options,expected",
(
([], {"id": "root", "token": "dstok"}),
(
["--all", "debug-menu"],
{"_r": {"a": ["dm"]}, "id": "root", "token": "dstok"},
),
(
["-a", "debug-menu", "--all", "create-table"],
{"_r": {"a": ["dm", "ct"]}, "id": "root", "token": "dstok"},
),
(
["-r", "db1", "t1", "insert-row"],
{"_r": {"r": {"db1": {"t1": ["ir"]}}}, "id": "root", "token": "dstok"},
),
(
["-d", "db1", "create-table"],
{"_r": {"d": {"db1": ["ct"]}}, "id": "root", "token": "dstok"},
),
# And one with all of them multiple times using all the names
(
[
"-a",
"debug-menu",
"--all",
"create-table",
"-r",
"db1",
"t1",
"insert-row",
"--resource",
"db1",
"t2",
"update-row",
"-d",
"db1",
"create-table",
"--database",
"db2",
"drop-table",
],
{
"_r": {
"a": ["dm", "ct"],
"d": {"db1": ["ct"], "db2": ["dt"]},
"r": {"db1": {"t1": ["ir"], "t2": ["ur"]}},
},
"id": "root",
"token": "dstok",
},
),
),
)
def test_cli_create_token(options, expected):
runner = CliRunner()
result1 = runner.invoke(
cli,
[
"create-token",
"--secret",
"sekrit",
"root",
]
+ options,
)
token = result1.output.strip()
result2 = runner.invoke(
cli,
[
"serve",
"--secret",
"sekrit",
"--get",
"/-/actor.json",
"--token",
token,
],
)
assert 0 == result2.exit_code, result2.output
assert json.loads(result2.output) == {"actor": expected}