mirror of
https://github.com/simonw/datasette.git
synced 2025-12-10 16:51:24 +01:00
* Neater design for PermissionSQL class, refs #2556 - source is now automatically set to the source plugin - params is optional * PermissionSQL.allow() and PermissionSQL.deny() shortcuts Closes #2556 * Filter out temp database from attached_databases() Refs https://github.com/simonw/datasette/issues/2557#issuecomment-3470510837
612 lines
20 KiB
Python
612 lines
20 KiB
Python
import pytest
|
||
from datasette.app import Datasette
|
||
from datasette.permissions import PermissionSQL
|
||
from datasette.utils.permissions import resolve_permissions_from_catalog
|
||
from typing import Callable, List
|
||
|
||
|
||
@pytest.fixture
|
||
def db():
|
||
ds = Datasette()
|
||
import tempfile
|
||
from datasette.database import Database
|
||
|
||
path = tempfile.mktemp(suffix="demo.db")
|
||
db = ds.add_database(Database(ds, path=path))
|
||
return db
|
||
|
||
|
||
NO_RULES_SQL = (
|
||
"SELECT NULL AS parent, NULL AS child, NULL AS allow, NULL AS reason WHERE 0"
|
||
)
|
||
|
||
|
||
def plugin_allow_all_for_user(user: str) -> Callable[[str], PermissionSQL]:
|
||
def provider(action: str) -> PermissionSQL:
|
||
return PermissionSQL(
|
||
"""
|
||
SELECT NULL AS parent, NULL AS child, 1 AS allow,
|
||
'global allow for ' || :allow_all_user || ' on ' || :allow_all_action AS reason
|
||
WHERE :actor_id = :allow_all_user
|
||
""",
|
||
{"allow_all_user": user, "allow_all_action": action},
|
||
)
|
||
|
||
return provider
|
||
|
||
|
||
def plugin_deny_specific_table(
|
||
user: str, parent: str, child: str
|
||
) -> Callable[[str], PermissionSQL]:
|
||
def provider(action: str) -> PermissionSQL:
|
||
return PermissionSQL(
|
||
"""
|
||
SELECT :deny_specific_table_parent AS parent, :deny_specific_table_child AS child, 0 AS allow,
|
||
'deny ' || :deny_specific_table_parent || '/' || :deny_specific_table_child || ' for ' || :deny_specific_table_user || ' on ' || :deny_specific_table_action AS reason
|
||
WHERE :actor_id = :deny_specific_table_user
|
||
""",
|
||
{
|
||
"deny_specific_table_parent": parent,
|
||
"deny_specific_table_child": child,
|
||
"deny_specific_table_user": user,
|
||
"deny_specific_table_action": action,
|
||
},
|
||
)
|
||
|
||
return provider
|
||
|
||
|
||
def plugin_org_policy_deny_parent(parent: str) -> Callable[[str], PermissionSQL]:
|
||
def provider(action: str) -> PermissionSQL:
|
||
return PermissionSQL(
|
||
"""
|
||
SELECT :org_policy_parent_deny_parent AS parent, NULL AS child, 0 AS allow,
|
||
'org policy: parent ' || :org_policy_parent_deny_parent || ' denied on ' || :org_policy_parent_deny_action AS reason
|
||
""",
|
||
{
|
||
"org_policy_parent_deny_parent": parent,
|
||
"org_policy_parent_deny_action": action,
|
||
},
|
||
)
|
||
|
||
return provider
|
||
|
||
|
||
def plugin_allow_parent_for_user(
|
||
user: str, parent: str
|
||
) -> Callable[[str], PermissionSQL]:
|
||
def provider(action: str) -> PermissionSQL:
|
||
return PermissionSQL(
|
||
"""
|
||
SELECT :allow_parent_parent AS parent, NULL AS child, 1 AS allow,
|
||
'allow full parent for ' || :allow_parent_user || ' on ' || :allow_parent_action AS reason
|
||
WHERE :actor_id = :allow_parent_user
|
||
""",
|
||
{
|
||
"allow_parent_parent": parent,
|
||
"allow_parent_user": user,
|
||
"allow_parent_action": action,
|
||
},
|
||
)
|
||
|
||
return provider
|
||
|
||
|
||
def plugin_child_allow_for_user(
|
||
user: str, parent: str, child: str
|
||
) -> Callable[[str], PermissionSQL]:
|
||
def provider(action: str) -> PermissionSQL:
|
||
return PermissionSQL(
|
||
"""
|
||
SELECT :allow_child_parent AS parent, :allow_child_child AS child, 1 AS allow,
|
||
'allow child for ' || :allow_child_user || ' on ' || :allow_child_action AS reason
|
||
WHERE :actor_id = :allow_child_user
|
||
""",
|
||
{
|
||
"allow_child_parent": parent,
|
||
"allow_child_child": child,
|
||
"allow_child_user": user,
|
||
"allow_child_action": action,
|
||
},
|
||
)
|
||
|
||
return provider
|
||
|
||
|
||
def plugin_root_deny_for_all() -> Callable[[str], PermissionSQL]:
|
||
def provider(action: str) -> PermissionSQL:
|
||
return PermissionSQL(
|
||
"""
|
||
SELECT NULL AS parent, NULL AS child, 0 AS allow, 'root deny for all on ' || :root_deny_action AS reason
|
||
""",
|
||
{"root_deny_action": action},
|
||
)
|
||
|
||
return provider
|
||
|
||
|
||
def plugin_conflicting_same_child_rules(
|
||
user: str, parent: str, child: str
|
||
) -> List[Callable[[str], PermissionSQL]]:
|
||
def allow_provider(action: str) -> PermissionSQL:
|
||
return PermissionSQL(
|
||
"""
|
||
SELECT :conflict_child_allow_parent AS parent, :conflict_child_allow_child AS child, 1 AS allow,
|
||
'team grant at child for ' || :conflict_child_allow_user || ' on ' || :conflict_child_allow_action AS reason
|
||
WHERE :actor_id = :conflict_child_allow_user
|
||
""",
|
||
{
|
||
"conflict_child_allow_parent": parent,
|
||
"conflict_child_allow_child": child,
|
||
"conflict_child_allow_user": user,
|
||
"conflict_child_allow_action": action,
|
||
},
|
||
)
|
||
|
||
def deny_provider(action: str) -> PermissionSQL:
|
||
return PermissionSQL(
|
||
"""
|
||
SELECT :conflict_child_deny_parent AS parent, :conflict_child_deny_child AS child, 0 AS allow,
|
||
'exception deny at child for ' || :conflict_child_deny_user || ' on ' || :conflict_child_deny_action AS reason
|
||
WHERE :actor_id = :conflict_child_deny_user
|
||
""",
|
||
{
|
||
"conflict_child_deny_parent": parent,
|
||
"conflict_child_deny_child": child,
|
||
"conflict_child_deny_user": user,
|
||
"conflict_child_deny_action": action,
|
||
},
|
||
)
|
||
|
||
return [allow_provider, deny_provider]
|
||
|
||
|
||
def plugin_allow_all_for_action(
|
||
user: str, allowed_action: str
|
||
) -> Callable[[str], PermissionSQL]:
|
||
def provider(action: str) -> PermissionSQL:
|
||
if action != allowed_action:
|
||
return PermissionSQL(NO_RULES_SQL)
|
||
# Sanitize parameter names by replacing hyphens with underscores
|
||
param_prefix = action.replace("-", "_")
|
||
return PermissionSQL(
|
||
f"""
|
||
SELECT NULL AS parent, NULL AS child, 1 AS allow,
|
||
'global allow for ' || :{param_prefix}_user || ' on ' || :{param_prefix}_action AS reason
|
||
WHERE :actor_id = :{param_prefix}_user
|
||
""",
|
||
{f"{param_prefix}_user": user, f"{param_prefix}_action": action},
|
||
)
|
||
|
||
return provider
|
||
|
||
|
||
VIEW_TABLE = "view-table"
|
||
|
||
|
||
# ---------- Catalog DDL (from your schema) ----------
|
||
CATALOG_DDL = """
|
||
CREATE TABLE IF NOT EXISTS catalog_databases (
|
||
database_name TEXT PRIMARY KEY,
|
||
path TEXT,
|
||
is_memory INTEGER,
|
||
schema_version INTEGER
|
||
);
|
||
CREATE TABLE IF NOT EXISTS catalog_tables (
|
||
database_name TEXT,
|
||
table_name TEXT,
|
||
rootpage INTEGER,
|
||
sql TEXT,
|
||
PRIMARY KEY (database_name, table_name),
|
||
FOREIGN KEY (database_name) REFERENCES catalog_databases(database_name)
|
||
);
|
||
"""
|
||
|
||
PARENTS = ["accounting", "hr", "analytics"]
|
||
SPECIALS = {"accounting": ["sales"], "analytics": ["secret"], "hr": []}
|
||
|
||
TABLE_CANDIDATES_SQL = (
|
||
"SELECT database_name AS parent, table_name AS child FROM catalog_tables"
|
||
)
|
||
PARENT_CANDIDATES_SQL = (
|
||
"SELECT database_name AS parent, NULL AS child FROM catalog_databases"
|
||
)
|
||
|
||
|
||
# ---------- Helpers ----------
|
||
async def seed_catalog(db, per_parent: int = 10) -> None:
|
||
await db.execute_write_script(CATALOG_DDL)
|
||
# databases
|
||
db_rows = [(p, f"/{p}.db", 0, 1) for p in PARENTS]
|
||
await db.execute_write_many(
|
||
"INSERT OR REPLACE INTO catalog_databases(database_name, path, is_memory, schema_version) VALUES (?,?,?,?)",
|
||
db_rows,
|
||
)
|
||
|
||
# tables
|
||
def tables_for(parent: str, n: int):
|
||
base = [f"table{i:02d}" for i in range(1, n + 1)]
|
||
for s in SPECIALS.get(parent, []):
|
||
if s not in base:
|
||
base[0] = s
|
||
return base
|
||
|
||
table_rows = []
|
||
for p in PARENTS:
|
||
for t in tables_for(p, per_parent):
|
||
table_rows.append((p, t, 0, f"CREATE TABLE {t} (id INTEGER PRIMARY KEY)"))
|
||
await db.execute_write_many(
|
||
"INSERT OR REPLACE INTO catalog_tables(database_name, table_name, rootpage, sql) VALUES (?,?,?,?)",
|
||
table_rows,
|
||
)
|
||
|
||
|
||
def res_allowed(rows, parent=None):
|
||
return sorted(
|
||
r["resource"]
|
||
for r in rows
|
||
if r["allow"] == 1 and (parent is None or r["parent"] == parent)
|
||
)
|
||
|
||
|
||
def res_denied(rows, parent=None):
|
||
return sorted(
|
||
r["resource"]
|
||
for r in rows
|
||
if r["allow"] == 0 and (parent is None or r["parent"] == parent)
|
||
)
|
||
|
||
|
||
# ---------- Tests ----------
|
||
@pytest.mark.asyncio
|
||
async def test_alice_global_allow_with_specific_denies_catalog(db):
|
||
await seed_catalog(db)
|
||
plugins = [
|
||
plugin_allow_all_for_user("alice"),
|
||
plugin_deny_specific_table("alice", "accounting", "sales"),
|
||
plugin_org_policy_deny_parent("hr"),
|
||
]
|
||
rows = await resolve_permissions_from_catalog(
|
||
db,
|
||
{"id": "alice"},
|
||
plugins,
|
||
VIEW_TABLE,
|
||
TABLE_CANDIDATES_SQL,
|
||
implicit_deny=True,
|
||
)
|
||
# Alice can see everything except accounting/sales and hr/*
|
||
assert "/accounting/sales" in res_denied(rows)
|
||
for r in rows:
|
||
if r["parent"] == "hr":
|
||
assert r["allow"] == 0
|
||
elif r["resource"] == "/accounting/sales":
|
||
assert r["allow"] == 0
|
||
else:
|
||
assert r["allow"] == 1
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_carol_parent_allow_but_child_conflict_deny_wins_catalog(db):
|
||
await seed_catalog(db)
|
||
plugins = [
|
||
plugin_org_policy_deny_parent("hr"),
|
||
plugin_allow_parent_for_user("carol", "analytics"),
|
||
*plugin_conflicting_same_child_rules("carol", "analytics", "secret"),
|
||
]
|
||
rows = await resolve_permissions_from_catalog(
|
||
db,
|
||
{"id": "carol"},
|
||
plugins,
|
||
VIEW_TABLE,
|
||
TABLE_CANDIDATES_SQL,
|
||
implicit_deny=True,
|
||
)
|
||
allowed_analytics = res_allowed(rows, parent="analytics")
|
||
denied_analytics = res_denied(rows, parent="analytics")
|
||
|
||
assert "/analytics/secret" in denied_analytics
|
||
# 10 analytics children total, 1 denied
|
||
assert len(allowed_analytics) == 9
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_specificity_child_allow_overrides_parent_deny_catalog(db):
|
||
await seed_catalog(db)
|
||
plugins = [
|
||
plugin_allow_all_for_user("alice"),
|
||
plugin_org_policy_deny_parent("analytics"), # parent-level deny
|
||
plugin_child_allow_for_user(
|
||
"alice", "analytics", "table02"
|
||
), # child allow beats parent deny
|
||
]
|
||
rows = await resolve_permissions_from_catalog(
|
||
db,
|
||
{"id": "alice"},
|
||
plugins,
|
||
VIEW_TABLE,
|
||
TABLE_CANDIDATES_SQL,
|
||
implicit_deny=True,
|
||
)
|
||
|
||
# table02 allowed, other analytics tables denied
|
||
assert any(r["resource"] == "/analytics/table02" and r["allow"] == 1 for r in rows)
|
||
assert all(
|
||
(r["parent"] != "analytics" or r["child"] == "table02" or r["allow"] == 0)
|
||
for r in rows
|
||
)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_root_deny_all_but_parent_allow_rescues_specific_parent_catalog(db):
|
||
await seed_catalog(db)
|
||
plugins = [
|
||
plugin_root_deny_for_all(), # root deny
|
||
plugin_allow_parent_for_user(
|
||
"bob", "accounting"
|
||
), # parent allow (more specific)
|
||
]
|
||
rows = await resolve_permissions_from_catalog(
|
||
db, {"id": "bob"}, plugins, VIEW_TABLE, TABLE_CANDIDATES_SQL, implicit_deny=True
|
||
)
|
||
for r in rows:
|
||
if r["parent"] == "accounting":
|
||
assert r["allow"] == 1
|
||
else:
|
||
assert r["allow"] == 0
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_parent_scoped_candidates(db):
|
||
await seed_catalog(db)
|
||
plugins = [
|
||
plugin_org_policy_deny_parent("hr"),
|
||
plugin_allow_parent_for_user("carol", "analytics"),
|
||
]
|
||
rows = await resolve_permissions_from_catalog(
|
||
db,
|
||
{"id": "carol"},
|
||
plugins,
|
||
VIEW_TABLE,
|
||
PARENT_CANDIDATES_SQL,
|
||
implicit_deny=True,
|
||
)
|
||
d = {r["resource"]: r["allow"] for r in rows}
|
||
assert d["/analytics"] == 1
|
||
assert d["/hr"] == 0
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_implicit_deny_behavior(db):
|
||
await seed_catalog(db)
|
||
plugins = [] # no rules at all
|
||
|
||
# implicit_deny=True -> everything denied with reason 'implicit deny'
|
||
rows = await resolve_permissions_from_catalog(
|
||
db,
|
||
{"id": "erin"},
|
||
plugins,
|
||
VIEW_TABLE,
|
||
TABLE_CANDIDATES_SQL,
|
||
implicit_deny=True,
|
||
)
|
||
assert all(r["allow"] == 0 and r["reason"] == "implicit deny" for r in rows)
|
||
|
||
# implicit_deny=False -> no winner => allow is None, reason is None
|
||
rows2 = await resolve_permissions_from_catalog(
|
||
db,
|
||
{"id": "erin"},
|
||
plugins,
|
||
VIEW_TABLE,
|
||
TABLE_CANDIDATES_SQL,
|
||
implicit_deny=False,
|
||
)
|
||
assert all(r["allow"] is None and r["reason"] is None for r in rows2)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_candidate_filters_via_params(db):
|
||
await seed_catalog(db)
|
||
# Add some metadata to test filtering
|
||
# Mark 'hr' as is_memory=1 and increment analytics schema_version
|
||
await db.execute_write(
|
||
"UPDATE catalog_databases SET is_memory=1 WHERE database_name='hr'"
|
||
)
|
||
await db.execute_write(
|
||
"UPDATE catalog_databases SET schema_version=2 WHERE database_name='analytics'"
|
||
)
|
||
|
||
# Candidate SQL that filters by db metadata via params
|
||
candidate_sql = """
|
||
SELECT t.database_name AS parent, t.table_name AS child
|
||
FROM catalog_tables t
|
||
JOIN catalog_databases d ON d.database_name = t.database_name
|
||
WHERE (:exclude_memory = 1 AND d.is_memory = 1) IS NOT 1
|
||
AND (:min_schema_version IS NULL OR d.schema_version >= :min_schema_version)
|
||
"""
|
||
|
||
plugins = [
|
||
plugin_root_deny_for_all(),
|
||
plugin_allow_parent_for_user(
|
||
"dev", "analytics"
|
||
), # analytics rescued if included by candidates
|
||
]
|
||
|
||
# Case 1: exclude memory dbs, require schema_version >= 2 -> only analytics appear, and thus are allowed
|
||
rows = await resolve_permissions_from_catalog(
|
||
db,
|
||
{"id": "dev"},
|
||
plugins,
|
||
VIEW_TABLE,
|
||
candidate_sql,
|
||
candidate_params={"exclude_memory": 1, "min_schema_version": 2},
|
||
implicit_deny=True,
|
||
)
|
||
assert rows and all(r["parent"] == "analytics" for r in rows)
|
||
assert all(r["allow"] == 1 for r in rows)
|
||
|
||
# Case 2: include memory dbs, min_schema_version = None -> accounting/hr/analytics appear,
|
||
# but root deny wins except where specifically allowed (none except analytics parent allow doesn’t apply to table depth if candidate includes children; still fine—policy is explicit).
|
||
rows2 = await resolve_permissions_from_catalog(
|
||
db,
|
||
{"id": "dev"},
|
||
plugins,
|
||
VIEW_TABLE,
|
||
candidate_sql,
|
||
candidate_params={"exclude_memory": 0, "min_schema_version": None},
|
||
implicit_deny=True,
|
||
)
|
||
assert any(r["parent"] == "accounting" for r in rows2)
|
||
assert any(r["parent"] == "hr" for r in rows2)
|
||
# For table-scoped candidates, the parent-level allow does not override root deny unless you have child-level rules
|
||
assert all(r["allow"] in (0, 1) for r in rows2)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_action_specific_rules(db):
|
||
await seed_catalog(db)
|
||
plugins = [plugin_allow_all_for_action("dana", VIEW_TABLE)]
|
||
|
||
view_rows = await resolve_permissions_from_catalog(
|
||
db,
|
||
{"id": "dana"},
|
||
plugins,
|
||
VIEW_TABLE,
|
||
TABLE_CANDIDATES_SQL,
|
||
implicit_deny=True,
|
||
)
|
||
assert view_rows and all(r["allow"] == 1 for r in view_rows)
|
||
assert all(r["action"] == VIEW_TABLE for r in view_rows)
|
||
|
||
insert_rows = await resolve_permissions_from_catalog(
|
||
db,
|
||
{"id": "dana"},
|
||
plugins,
|
||
"insert-row",
|
||
TABLE_CANDIDATES_SQL,
|
||
implicit_deny=True,
|
||
)
|
||
assert insert_rows and all(r["allow"] == 0 for r in insert_rows)
|
||
assert all(r["reason"] == "implicit deny" for r in insert_rows)
|
||
assert all(r["action"] == "insert-row" for r in insert_rows)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_actor_actor_id_action_parameters_available(db):
|
||
"""Test that :actor (JSON), :actor_id, and :action are all available in SQL"""
|
||
await seed_catalog(db)
|
||
|
||
def plugin_using_all_parameters() -> Callable[[str], PermissionSQL]:
|
||
def provider(action: str) -> PermissionSQL:
|
||
return PermissionSQL(
|
||
"""
|
||
SELECT NULL AS parent, NULL AS child, 1 AS allow,
|
||
'Actor ID: ' || COALESCE(:actor_id, 'null') ||
|
||
', Actor JSON: ' || COALESCE(:actor, 'null') ||
|
||
', Action: ' || :action AS reason
|
||
WHERE :actor_id = 'test_user' AND :action = 'view-table'
|
||
AND json_extract(:actor, '$.role') = 'admin'
|
||
"""
|
||
)
|
||
|
||
return provider
|
||
|
||
plugins = [plugin_using_all_parameters()]
|
||
|
||
# Test with full actor dict
|
||
rows = await resolve_permissions_from_catalog(
|
||
db,
|
||
{"id": "test_user", "role": "admin"},
|
||
plugins,
|
||
"view-table",
|
||
TABLE_CANDIDATES_SQL,
|
||
implicit_deny=True,
|
||
)
|
||
|
||
# Should have allowed rows with reason containing all the info
|
||
allowed = [r for r in rows if r["allow"] == 1]
|
||
assert len(allowed) > 0
|
||
|
||
# Check that the reason string contains evidence of all parameters
|
||
reason = allowed[0]["reason"]
|
||
assert "test_user" in reason
|
||
assert "view-table" in reason
|
||
# The :actor parameter should be the JSON string
|
||
assert "Actor JSON:" in reason
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_multiple_plugins_with_own_parameters(db):
|
||
"""
|
||
Test that multiple plugins can use their own parameter names without conflict.
|
||
|
||
This verifies that the parameter naming convention works: plugins prefix their
|
||
parameters (e.g., :plugin1_pattern, :plugin2_message) and both sets of parameters
|
||
are successfully bound in the SQL queries.
|
||
"""
|
||
await seed_catalog(db)
|
||
|
||
def plugin_one() -> Callable[[str], PermissionSQL]:
|
||
def provider(action: str) -> PermissionSQL:
|
||
if action != "view-table":
|
||
return PermissionSQL("plugin_one", "SELECT NULL WHERE 0", {})
|
||
return PermissionSQL(
|
||
"""
|
||
SELECT database_name AS parent, table_name AS child,
|
||
1 AS allow, 'Plugin one used param: ' || :plugin1_param AS reason
|
||
FROM catalog_tables
|
||
WHERE database_name = 'accounting'
|
||
""",
|
||
{
|
||
"plugin1_param": "value1",
|
||
},
|
||
)
|
||
|
||
return provider
|
||
|
||
def plugin_two() -> Callable[[str], PermissionSQL]:
|
||
def provider(action: str) -> PermissionSQL:
|
||
if action != "view-table":
|
||
return PermissionSQL("plugin_two", "SELECT NULL WHERE 0", {})
|
||
return PermissionSQL(
|
||
"""
|
||
SELECT database_name AS parent, table_name AS child,
|
||
1 AS allow, 'Plugin two used param: ' || :plugin2_param AS reason
|
||
FROM catalog_tables
|
||
WHERE database_name = 'hr'
|
||
""",
|
||
{
|
||
"plugin2_param": "value2",
|
||
},
|
||
)
|
||
|
||
return provider
|
||
|
||
plugins = [plugin_one(), plugin_two()]
|
||
|
||
rows = await resolve_permissions_from_catalog(
|
||
db,
|
||
{"id": "test_user"},
|
||
plugins,
|
||
"view-table",
|
||
TABLE_CANDIDATES_SQL,
|
||
implicit_deny=False,
|
||
)
|
||
|
||
# Both plugins should contribute results with their parameters successfully bound
|
||
plugin_one_rows = [
|
||
r for r in rows if r.get("reason") and "Plugin one" in r["reason"]
|
||
]
|
||
plugin_two_rows = [
|
||
r for r in rows if r.get("reason") and "Plugin two" in r["reason"]
|
||
]
|
||
|
||
assert len(plugin_one_rows) > 0, "Plugin one should contribute rules"
|
||
assert len(plugin_two_rows) > 0, "Plugin two should contribute rules"
|
||
|
||
# Verify each plugin's parameters were successfully bound in the SQL
|
||
assert any(
|
||
"value1" in r.get("reason", "") for r in plugin_one_rows
|
||
), "Plugin one's :plugin1_param should be bound"
|
||
assert any(
|
||
"value2" in r.get("reason", "") for r in plugin_two_rows
|
||
), "Plugin two's :plugin2_param should be bound"
|