mirror of
https://github.com/simonw/datasette.git
synced 2026-06-23 09:14:34 +02:00
Returns a list of tables with a single primary key, and for each one the name of that primary key column and its SQLite type affinity. This will be used by the create table UI to suggest foreign keys.
2310 lines
70 KiB
Python
2310 lines
70 KiB
Python
from datasette.app import Datasette
|
|
from datasette.utils import sqlite3
|
|
from .utils import last_event
|
|
import pytest
|
|
import time
|
|
|
|
|
|
@pytest.fixture
|
|
def ds_write(tmp_path_factory):
|
|
db_directory = tmp_path_factory.mktemp("dbs")
|
|
db_path = str(db_directory / "data.db")
|
|
db_path_immutable = str(db_directory / "immutable.db")
|
|
db1 = sqlite3.connect(str(db_path))
|
|
db2 = sqlite3.connect(str(db_path_immutable))
|
|
for db in (db1, db2):
|
|
db.execute("vacuum")
|
|
db.execute(
|
|
"create table docs (id integer primary key, title text, score float, age integer)"
|
|
)
|
|
db1.close()
|
|
db2.close()
|
|
ds = Datasette([db_path], immutables=[db_path_immutable])
|
|
ds.root_enabled = True
|
|
yield ds
|
|
ds.close()
|
|
|
|
|
|
def write_token(ds, actor_id="root", permissions=None):
|
|
to_sign = {"a": actor_id, "token": "dstok", "t": int(time.time())}
|
|
if permissions:
|
|
to_sign["_r"] = {"a": permissions}
|
|
return "dstok_{}".format(ds.sign(to_sign, namespace="token"))
|
|
|
|
|
|
def _headers(token):
|
|
return {
|
|
"Authorization": "Bearer {}".format(token),
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_explorer_upsert_example_json(ds_write):
|
|
response = await ds_write.client.get("/-/api", actor={"id": "root"})
|
|
assert response.status_code == 200
|
|
import urllib.parse
|
|
|
|
text = urllib.parse.unquote_plus(response.text)
|
|
upsert_idx = text.index("/data/docs/-/upsert")
|
|
upsert_chunk = text[upsert_idx : upsert_idx + 500]
|
|
assert '"id": "<id (primary key)>"' in upsert_chunk
|
|
assert '"title": "<title>"' in upsert_chunk
|
|
assert '"score": "<score>"' in upsert_chunk
|
|
assert '"age": "<age>"' in upsert_chunk
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_explorer_upsert_example_json_rowid_table(tmp_path_factory):
|
|
db_path = str(tmp_path_factory.mktemp("dbs") / "data.db")
|
|
conn = sqlite3.connect(db_path)
|
|
conn.execute("create table things (title text, score float)")
|
|
conn.close()
|
|
ds = Datasette([db_path])
|
|
ds.root_enabled = True
|
|
response = await ds.client.get("/-/api", actor={"id": "root"})
|
|
assert response.status_code == 200
|
|
import urllib.parse
|
|
|
|
text = urllib.parse.unquote_plus(response.text)
|
|
upsert_idx = text.index("/data/things/-/upsert")
|
|
upsert_chunk = text[upsert_idx : upsert_idx + 500]
|
|
assert '"rowid": "<rowid (primary key)>"' in upsert_chunk
|
|
assert '"title": "<title>"' in upsert_chunk
|
|
assert '"score": "<score>"' in upsert_chunk
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"content_type",
|
|
(
|
|
"application/json",
|
|
"application/json; charset=utf-8",
|
|
),
|
|
)
|
|
async def test_insert_row(ds_write, content_type):
|
|
token = write_token(ds_write)
|
|
response = await ds_write.client.post(
|
|
"/data/docs/-/insert",
|
|
json={"row": {"title": "Test", "score": 1.2, "age": 5}},
|
|
headers={
|
|
"Authorization": "Bearer {}".format(token),
|
|
"Content-Type": content_type,
|
|
},
|
|
)
|
|
expected_row = {"id": 1, "title": "Test", "score": 1.2, "age": 5}
|
|
assert response.status_code == 201
|
|
assert response.json()["ok"] is True
|
|
assert response.json()["rows"] == [expected_row]
|
|
rows = (await ds_write.get_database("data").execute("select * from docs")).dicts()
|
|
assert rows[0] == expected_row
|
|
# Analytics event
|
|
event = last_event(ds_write)
|
|
assert event.name == "insert-rows"
|
|
assert event.num_rows == 1
|
|
assert event.database == "data"
|
|
assert event.table == "docs"
|
|
assert not event.ignore
|
|
assert not event.replace
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_insert_row_alter(ds_write):
|
|
token = write_token(ds_write)
|
|
response = await ds_write.client.post(
|
|
"/data/docs/-/insert",
|
|
json={
|
|
"row": {"title": "Test", "score": 1.2, "age": 5, "extra": "extra"},
|
|
"alter": True,
|
|
},
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 201
|
|
assert response.json()["ok"] is True
|
|
assert response.json()["rows"][0]["extra"] == "extra"
|
|
# Analytics event
|
|
event = last_event(ds_write)
|
|
assert event.name == "alter-table"
|
|
assert "extra" not in event.before_schema
|
|
assert "extra" in event.after_schema
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("return_rows", (True, False))
|
|
async def test_insert_rows(ds_write, return_rows):
|
|
token = write_token(ds_write)
|
|
data = {
|
|
"rows": [
|
|
{"title": "Test {}".format(i), "score": 1.0, "age": 5} for i in range(20)
|
|
]
|
|
}
|
|
if return_rows:
|
|
data["return"] = True
|
|
response = await ds_write.client.post(
|
|
"/data/docs/-/insert",
|
|
json=data,
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 201
|
|
|
|
# Analytics event
|
|
event = last_event(ds_write)
|
|
assert event.name == "insert-rows"
|
|
assert event.num_rows == 20
|
|
assert event.database == "data"
|
|
assert event.table == "docs"
|
|
assert not event.ignore
|
|
assert not event.replace
|
|
|
|
actual_rows = (
|
|
await ds_write.get_database("data").execute("select * from docs")
|
|
).dicts()
|
|
assert len(actual_rows) == 20
|
|
assert actual_rows == [
|
|
{"id": i + 1, "title": "Test {}".format(i), "score": 1.0, "age": 5}
|
|
for i in range(20)
|
|
]
|
|
assert response.json()["ok"] is True
|
|
if return_rows:
|
|
assert response.json()["rows"] == actual_rows
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"path,input,special_case,expected_status,expected_errors",
|
|
(
|
|
(
|
|
"/data2/docs/-/insert",
|
|
{},
|
|
None,
|
|
404,
|
|
["Database not found"],
|
|
),
|
|
(
|
|
"/data/docs2/-/insert",
|
|
{},
|
|
None,
|
|
404,
|
|
["Table not found"],
|
|
),
|
|
(
|
|
"/data/docs/-/insert",
|
|
{"rows": [{"title": "Test"} for i in range(10)]},
|
|
"bad_token",
|
|
403,
|
|
["Permission denied"],
|
|
),
|
|
(
|
|
"/data/docs/-/insert",
|
|
{},
|
|
"invalid_json",
|
|
400,
|
|
[
|
|
"Invalid JSON: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)"
|
|
],
|
|
),
|
|
(
|
|
"/data/docs/-/insert",
|
|
{},
|
|
"invalid_content_type",
|
|
400,
|
|
["Invalid content-type, must be application/json"],
|
|
),
|
|
(
|
|
"/data/docs/-/insert",
|
|
[],
|
|
None,
|
|
400,
|
|
["JSON must be a dictionary"],
|
|
),
|
|
(
|
|
"/data/docs/-/insert",
|
|
{"row": "blah"},
|
|
None,
|
|
400,
|
|
['"row" must be a dictionary'],
|
|
),
|
|
(
|
|
"/data/docs/-/insert",
|
|
{"blah": "blah"},
|
|
None,
|
|
400,
|
|
['JSON must have one or other of "row" or "rows"'],
|
|
),
|
|
(
|
|
"/data/docs/-/insert",
|
|
{"rows": "blah"},
|
|
None,
|
|
400,
|
|
['"rows" must be a list'],
|
|
),
|
|
(
|
|
"/data/docs/-/insert",
|
|
{"rows": ["blah"]},
|
|
None,
|
|
400,
|
|
['"rows" must be a list of dictionaries'],
|
|
),
|
|
(
|
|
"/data/docs/-/insert",
|
|
{"rows": [{"title": "Test"} for i in range(101)]},
|
|
None,
|
|
400,
|
|
["Too many rows, maximum allowed is 100"],
|
|
),
|
|
(
|
|
"/data/docs/-/insert",
|
|
{"rows": [{"id": 1, "title": "Test"}, {"id": 2, "title": "Test"}]},
|
|
"duplicate_id",
|
|
400,
|
|
["UNIQUE constraint failed: docs.id"],
|
|
),
|
|
(
|
|
"/data/docs/-/insert",
|
|
{"rows": [{"title": "Test"}], "ignore": True, "replace": True},
|
|
None,
|
|
400,
|
|
['Cannot use "ignore" and "replace" at the same time'],
|
|
),
|
|
(
|
|
# Replace is not allowed if you don't have update-row
|
|
"/data/docs/-/insert",
|
|
{"rows": [{"title": "Test"}], "replace": True},
|
|
"insert-but-not-update",
|
|
403,
|
|
['Permission denied: need update-row to use "replace"'],
|
|
),
|
|
(
|
|
"/data/docs/-/insert",
|
|
{"rows": [{"title": "Test"}], "invalid_param": True},
|
|
None,
|
|
400,
|
|
['Invalid parameter: "invalid_param"'],
|
|
),
|
|
(
|
|
"/data/docs/-/insert",
|
|
{"rows": [{"title": "Test"}], "one": True, "two": True},
|
|
None,
|
|
400,
|
|
['Invalid parameter: "one", "two"'],
|
|
),
|
|
(
|
|
"/immutable/docs/-/insert",
|
|
{"rows": [{"title": "Test"}]},
|
|
None,
|
|
403,
|
|
["Database is immutable"],
|
|
),
|
|
# Validate columns of each row
|
|
(
|
|
"/data/docs/-/insert",
|
|
{"rows": [{"title": "Test", "bad": 1, "worse": 2} for i in range(2)]},
|
|
None,
|
|
400,
|
|
[
|
|
"Row 0 has invalid columns: bad, worse",
|
|
"Row 1 has invalid columns: bad, worse",
|
|
],
|
|
),
|
|
## UPSERT ERRORS:
|
|
(
|
|
"/immutable/docs/-/upsert",
|
|
{"rows": [{"title": "Test"}]},
|
|
None,
|
|
403,
|
|
["Database is immutable"],
|
|
),
|
|
(
|
|
"/data/badtable/-/upsert",
|
|
{"rows": [{"title": "Test"}]},
|
|
None,
|
|
404,
|
|
["Table not found"],
|
|
),
|
|
# missing primary key
|
|
(
|
|
"/data/docs/-/upsert",
|
|
{"rows": [{"title": "Missing PK"}]},
|
|
None,
|
|
400,
|
|
['Row 0 is missing primary key column(s): "id"'],
|
|
),
|
|
# null primary key
|
|
(
|
|
"/data/docs/-/upsert",
|
|
{"rows": [{"id": None, "title": "Null PK"}]},
|
|
None,
|
|
400,
|
|
['Row 0 has null primary key column(s): "id"'],
|
|
),
|
|
# Upsert does not support ignore or replace
|
|
(
|
|
"/data/docs/-/upsert",
|
|
{"rows": [{"id": 1, "title": "Bad"}], "ignore": True},
|
|
None,
|
|
400,
|
|
["Upsert does not support ignore or replace"],
|
|
),
|
|
# Upsert permissions
|
|
(
|
|
"/data/docs/-/upsert",
|
|
{"rows": [{"id": 1, "title": "Disallowed"}]},
|
|
"insert-but-not-update",
|
|
403,
|
|
["Permission denied: need both insert-row and update-row"],
|
|
),
|
|
(
|
|
"/data/docs/-/upsert",
|
|
{"rows": [{"id": 1, "title": "Disallowed"}]},
|
|
"update-but-not-insert",
|
|
403,
|
|
["Permission denied: need both insert-row and update-row"],
|
|
),
|
|
# Alter table forbidden without alter permission
|
|
(
|
|
"/data/docs/-/upsert",
|
|
{"rows": [{"id": 1, "title": "One", "extra": "extra"}], "alter": True},
|
|
"update-and-insert-but-no-alter",
|
|
403,
|
|
["Permission denied for alter-table"],
|
|
),
|
|
),
|
|
)
|
|
async def test_insert_or_upsert_row_errors(
|
|
ds_write, path, input, special_case, expected_status, expected_errors
|
|
):
|
|
token_permissions = []
|
|
if special_case == "insert-but-not-update":
|
|
token_permissions = ["ir", "vi"]
|
|
if special_case == "update-but-not-insert":
|
|
token_permissions = ["ur", "vi"]
|
|
if special_case == "update-and-insert-but-no-alter":
|
|
token_permissions = ["ur", "ir"]
|
|
token = write_token(ds_write, permissions=token_permissions)
|
|
if special_case == "duplicate_id":
|
|
await ds_write.get_database("data").execute_write(
|
|
"insert into docs (id) values (1)"
|
|
)
|
|
if special_case == "bad_token":
|
|
token += "bad"
|
|
kwargs = dict(
|
|
json=input,
|
|
headers={
|
|
"Authorization": "Bearer {}".format(token),
|
|
"Content-Type": (
|
|
"text/plain"
|
|
if special_case == "invalid_content_type"
|
|
else "application/json"
|
|
),
|
|
},
|
|
)
|
|
|
|
actor_response = (
|
|
await ds_write.client.get("/-/actor.json", headers=kwargs["headers"])
|
|
).json()
|
|
assert set((actor_response["actor"] or {}).get("_r", {}).get("a") or []) == set(
|
|
token_permissions
|
|
)
|
|
|
|
if special_case == "invalid_json":
|
|
del kwargs["json"]
|
|
kwargs["content"] = "{bad json"
|
|
before_count = (
|
|
await ds_write.get_database("data").execute("select count(*) from docs")
|
|
).rows[0][0] == 0
|
|
response = await ds_write.client.post(
|
|
path,
|
|
**kwargs,
|
|
)
|
|
assert response.status_code == expected_status
|
|
assert response.json()["ok"] is False
|
|
assert response.json()["errors"] == expected_errors
|
|
# Check that no rows were inserted
|
|
after_count = (
|
|
await ds_write.get_database("data").execute("select count(*) from docs")
|
|
).rows[0][0] == 0
|
|
assert before_count == after_count
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("allowed", (True, False))
|
|
async def test_upsert_permissions_per_table(ds_write, allowed):
|
|
# https://github.com/simonw/datasette/issues/2262
|
|
token = "dstok_{}".format(
|
|
ds_write.sign(
|
|
{
|
|
"a": "root",
|
|
"token": "dstok",
|
|
"t": int(time.time()),
|
|
"_r": {
|
|
"r": {
|
|
"data": {
|
|
"docs" if allowed else "other": ["ir", "ur"],
|
|
}
|
|
}
|
|
},
|
|
},
|
|
namespace="token",
|
|
)
|
|
)
|
|
response = await ds_write.client.post(
|
|
"/data/docs/-/upsert",
|
|
json={"rows": [{"id": 1, "title": "One"}]},
|
|
headers={
|
|
"Authorization": "Bearer {}".format(token),
|
|
},
|
|
)
|
|
if allowed:
|
|
assert response.status_code == 200
|
|
assert response.json()["ok"] is True
|
|
else:
|
|
assert response.status_code == 403
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"ignore,replace,expected_rows",
|
|
(
|
|
(
|
|
True,
|
|
False,
|
|
[
|
|
{"id": 1, "title": "Exists", "score": None, "age": None},
|
|
],
|
|
),
|
|
(
|
|
False,
|
|
True,
|
|
[
|
|
{"id": 1, "title": "One", "score": None, "age": None},
|
|
],
|
|
),
|
|
),
|
|
)
|
|
@pytest.mark.parametrize("should_return", (True, False))
|
|
async def test_insert_ignore_replace(
|
|
ds_write, ignore, replace, expected_rows, should_return
|
|
):
|
|
await ds_write.get_database("data").execute_write(
|
|
"insert into docs (id, title) values (1, 'Exists')"
|
|
)
|
|
token = write_token(ds_write)
|
|
data = {"rows": [{"id": 1, "title": "One"}]}
|
|
if ignore:
|
|
data["ignore"] = True
|
|
if replace:
|
|
data["replace"] = True
|
|
if should_return:
|
|
data["return"] = True
|
|
response = await ds_write.client.post(
|
|
"/data/docs/-/insert",
|
|
json=data,
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 201
|
|
|
|
# Analytics event
|
|
event = last_event(ds_write)
|
|
assert event.name == "insert-rows"
|
|
assert event.num_rows == 1
|
|
assert event.database == "data"
|
|
assert event.table == "docs"
|
|
assert event.ignore == ignore
|
|
assert event.replace == replace
|
|
|
|
actual_rows = (
|
|
await ds_write.get_database("data").execute("select * from docs")
|
|
).dicts()
|
|
|
|
assert actual_rows == expected_rows
|
|
assert response.json()["ok"] is True
|
|
if should_return:
|
|
assert response.json()["rows"] == expected_rows
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"initial,input,expected_rows",
|
|
(
|
|
(
|
|
# Simple primary key update
|
|
{"rows": [{"id": 1, "title": "One"}], "pk": "id"},
|
|
{"rows": [{"id": 1, "title": "Two"}]},
|
|
[
|
|
{"id": 1, "title": "Two"},
|
|
],
|
|
),
|
|
(
|
|
# Multiple rows update one of them
|
|
{
|
|
"rows": [{"id": 1, "title": "One"}, {"id": 2, "title": "Two"}],
|
|
"pk": "id",
|
|
},
|
|
{"rows": [{"id": 1, "title": "Three"}]},
|
|
[
|
|
{"id": 1, "title": "Three"},
|
|
{"id": 2, "title": "Two"},
|
|
],
|
|
),
|
|
(
|
|
# rowid update
|
|
{"rows": [{"title": "One"}]},
|
|
{"rows": [{"rowid": 1, "title": "Two"}]},
|
|
[
|
|
{"rowid": 1, "title": "Two"},
|
|
],
|
|
),
|
|
(
|
|
# Compound primary key update
|
|
{"rows": [{"id": 1, "title": "One", "score": 1}], "pks": ["id", "score"]},
|
|
{"rows": [{"id": 1, "title": "Two", "score": 1}]},
|
|
[
|
|
{"id": 1, "title": "Two", "score": 1},
|
|
],
|
|
),
|
|
(
|
|
# Upsert with an alter
|
|
{"rows": [{"id": 1, "title": "One"}], "pk": "id"},
|
|
{"rows": [{"id": 1, "title": "Two", "extra": "extra"}], "alter": True},
|
|
[{"id": 1, "title": "Two", "extra": "extra"}],
|
|
),
|
|
),
|
|
)
|
|
@pytest.mark.parametrize("should_return", (False, True))
|
|
async def test_upsert(ds_write, initial, input, expected_rows, should_return):
|
|
token = write_token(ds_write)
|
|
# Insert initial data
|
|
initial["table"] = "upsert_test"
|
|
create_response = await ds_write.client.post(
|
|
"/data/-/create",
|
|
json=initial,
|
|
headers=_headers(token),
|
|
)
|
|
assert create_response.status_code == 201
|
|
if should_return:
|
|
input["return"] = True
|
|
response = await ds_write.client.post(
|
|
"/data/upsert_test/-/upsert",
|
|
json=input,
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 200, response.text
|
|
assert response.json()["ok"] is True
|
|
|
|
# Analytics event
|
|
event = last_event(ds_write)
|
|
assert event.database == "data"
|
|
assert event.table == "upsert_test"
|
|
if input.get("alter"):
|
|
assert event.name == "alter-table"
|
|
assert "extra" in event.after_schema
|
|
else:
|
|
assert event.name == "upsert-rows"
|
|
assert event.num_rows == 1
|
|
|
|
if should_return:
|
|
# We only expect it to return rows corresponding to those we sent
|
|
expected_returned_rows = expected_rows[: len(input["rows"])]
|
|
assert response.json()["rows"] == expected_returned_rows
|
|
# Check the database too
|
|
actual_rows = (
|
|
await ds_write.client.get("/data/upsert_test.json?_shape=array")
|
|
).json()
|
|
assert actual_rows == expected_rows
|
|
# Drop the upsert_test table
|
|
await ds_write.get_database("data").execute_write("drop table upsert_test")
|
|
|
|
|
|
async def _insert_row(ds):
|
|
insert_response = await ds.client.post(
|
|
"/data/docs/-/insert",
|
|
json={"row": {"title": "Row one", "score": 1.2, "age": 5}, "return": True},
|
|
headers=_headers(write_token(ds)),
|
|
)
|
|
assert insert_response.status_code == 201
|
|
return insert_response.json()["rows"][0]["id"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("scenario", ("no_token", "no_perm", "bad_table"))
|
|
async def test_delete_row_errors(ds_write, scenario):
|
|
if scenario == "no_token":
|
|
token = "bad_token"
|
|
elif scenario == "no_perm":
|
|
token = write_token(ds_write, actor_id="not-root")
|
|
else:
|
|
token = write_token(ds_write)
|
|
|
|
pk = await _insert_row(ds_write)
|
|
|
|
path = "/data/{}/{}/-/delete".format(
|
|
"docs" if scenario != "bad_table" else "bad_table", pk
|
|
)
|
|
response = await ds_write.client.post(
|
|
path,
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 403 if scenario in ("no_token", "bad_token") else 404
|
|
assert response.json()["ok"] is False
|
|
assert (
|
|
response.json()["errors"] == ["Permission denied"]
|
|
if scenario == "no_token"
|
|
else ["Table not found"]
|
|
)
|
|
assert len((await ds_write.client.get("/data/docs.json?_shape=array")).json()) == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"table,row_for_create,pks,delete_path",
|
|
(
|
|
("rowid_table", {"name": "rowid row"}, None, None),
|
|
("pk_table", {"id": 1, "name": "ID table"}, "id", "1"),
|
|
(
|
|
"compound_pk_table",
|
|
{"type": "article", "key": "k"},
|
|
["type", "key"],
|
|
"article,k",
|
|
),
|
|
),
|
|
)
|
|
async def test_delete_row(ds_write, table, row_for_create, pks, delete_path):
|
|
# First create the table with that example row
|
|
create_data = {
|
|
"table": table,
|
|
"row": row_for_create,
|
|
}
|
|
if pks:
|
|
if isinstance(pks, str):
|
|
create_data["pk"] = pks
|
|
else:
|
|
create_data["pks"] = pks
|
|
create_response = await ds_write.client.post(
|
|
"/data/-/create",
|
|
json=create_data,
|
|
headers=_headers(write_token(ds_write)),
|
|
)
|
|
assert create_response.status_code == 201, create_response.json()
|
|
# Should be a single row
|
|
assert (
|
|
await ds_write.client.get(
|
|
"/data/-/query.json?_shape=arrayfirst&sql=select+count(*)+from+{}".format(
|
|
table
|
|
)
|
|
)
|
|
).json() == [1]
|
|
# Now delete the row
|
|
if delete_path is None:
|
|
# Special case for that rowid table
|
|
delete_path = (
|
|
await ds_write.client.get(
|
|
"/data/-/query.json?_shape=arrayfirst&sql=select+rowid+from+{}".format(
|
|
table
|
|
)
|
|
)
|
|
).json()[0]
|
|
|
|
delete_response = await ds_write.client.post(
|
|
"/data/{}/{}/-/delete".format(table, delete_path),
|
|
headers=_headers(write_token(ds_write)),
|
|
)
|
|
assert delete_response.status_code == 200
|
|
|
|
# Analytics event
|
|
event = last_event(ds_write)
|
|
assert event.name == "delete-row"
|
|
assert event.database == "data"
|
|
assert event.table == table
|
|
assert event.pks == str(delete_path).split(",")
|
|
assert (
|
|
await ds_write.client.get(
|
|
"/data/-/query.json?_shape=arrayfirst&sql=select+count(*)+from+{}".format(
|
|
table
|
|
)
|
|
)
|
|
).json() == [0]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"scenario", ("no_token", "no_perm", "bad_table", "cannot_alter")
|
|
)
|
|
async def test_update_row_check_permission(ds_write, scenario):
|
|
if scenario == "no_token":
|
|
token = "bad_token"
|
|
elif scenario == "no_perm":
|
|
token = write_token(ds_write, actor_id="not-root")
|
|
elif scenario == "cannot_alter":
|
|
# update-row but no alter-table:
|
|
token = write_token(ds_write, permissions=["ur"])
|
|
else:
|
|
token = write_token(ds_write)
|
|
|
|
pk = await _insert_row(ds_write)
|
|
|
|
path = "/data/{}/{}/-/update".format(
|
|
"docs" if scenario != "bad_table" else "bad_table", pk
|
|
)
|
|
|
|
json_body = {"update": {"title": "New title"}}
|
|
if scenario == "cannot_alter":
|
|
json_body["alter"] = True
|
|
|
|
response = await ds_write.client.post(
|
|
path,
|
|
json=json_body,
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 403 if scenario in ("no_token", "bad_token") else 404
|
|
assert response.json()["ok"] is False
|
|
assert (
|
|
response.json()["errors"] == ["Permission denied"]
|
|
if scenario == "no_token"
|
|
else ["Table not found"]
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_row_invalid_key(ds_write):
|
|
token = write_token(ds_write)
|
|
|
|
pk = await _insert_row(ds_write)
|
|
|
|
path = "/data/docs/{}/-/update".format(pk)
|
|
response = await ds_write.client.post(
|
|
path,
|
|
json={"update": {"title": "New title"}, "bad_key": 1},
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 400
|
|
assert response.json() == {"ok": False, "errors": ["Invalid keys: bad_key"]}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_row_alter(ds_write):
|
|
token = write_token(ds_write, permissions=["ur", "at"])
|
|
pk = await _insert_row(ds_write)
|
|
path = "/data/docs/{}/-/update".format(pk)
|
|
response = await ds_write.client.post(
|
|
path,
|
|
json={"update": {"title": "New title", "extra": "extra"}, "alter": True},
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json() == {"ok": True}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_alter_table_operations(ds_write):
|
|
token = write_token(ds_write, permissions=["at"])
|
|
db = ds_write.get_database("data")
|
|
before_schema = await db.execute_fn(
|
|
lambda conn: conn.execute(
|
|
"select sql from sqlite_master where type = 'table' and name = 'docs'"
|
|
).fetchone()[0]
|
|
)
|
|
|
|
response = await ds_write.client.post(
|
|
"/data/docs/-/alter",
|
|
json={
|
|
"operations": [
|
|
{
|
|
"op": "add_column",
|
|
"args": {
|
|
"name": "slug",
|
|
"type": "text",
|
|
"not_null": True,
|
|
"default": "",
|
|
},
|
|
},
|
|
{
|
|
"op": "add_column",
|
|
"args": {
|
|
"name": "created",
|
|
"type": "text",
|
|
"default_expr": "current_timestamp",
|
|
},
|
|
},
|
|
{
|
|
"op": "add_column",
|
|
"args": {
|
|
"name": "literal_default",
|
|
"type": "text",
|
|
"default": "hello)",
|
|
},
|
|
},
|
|
{"op": "rename_column", "args": {"name": "title", "to": "headline"}},
|
|
{
|
|
"op": "alter_column",
|
|
"args": {"name": "age", "type": "text", "default": "0"},
|
|
},
|
|
{"op": "drop_column", "args": {"name": "score"}},
|
|
{
|
|
"op": "reorder_columns",
|
|
"args": {
|
|
"columns": [
|
|
"id",
|
|
"headline",
|
|
"slug",
|
|
"created",
|
|
"literal_default",
|
|
"age",
|
|
]
|
|
},
|
|
},
|
|
{"op": "set_primary_key", "args": {"columns": ["id"]}},
|
|
]
|
|
},
|
|
headers=_headers(token),
|
|
)
|
|
|
|
assert response.status_code == 200, response.text
|
|
data = response.json()
|
|
assert data["ok"] is True
|
|
assert data["database"] == "data"
|
|
assert data["table"] == "docs"
|
|
assert data["altered"] is True
|
|
assert data["operations_applied"] == 8
|
|
assert data["before_schema"] == before_schema
|
|
assert "headline" in data["schema"]
|
|
assert "score" not in data["schema"]
|
|
assert "DEFAULT CURRENT_TIMESTAMP" in data["schema"]
|
|
assert "DEFAULT 'hello)'" in data["schema"]
|
|
|
|
columns = (
|
|
await db.execute("select * from pragma_table_info('docs') order by cid")
|
|
).dicts()
|
|
assert [column["name"] for column in columns] == [
|
|
"id",
|
|
"headline",
|
|
"slug",
|
|
"created",
|
|
"literal_default",
|
|
"age",
|
|
]
|
|
assert columns[0]["pk"] == 1
|
|
assert columns[2]["notnull"] == 1
|
|
assert columns[2]["dflt_value"] == "''"
|
|
assert columns[3]["dflt_value"] == "CURRENT_TIMESTAMP"
|
|
assert columns[4]["dflt_value"] == "'hello)'"
|
|
assert columns[5]["type"] == "TEXT"
|
|
assert columns[5]["dflt_value"] == "'0'"
|
|
|
|
event = last_event(ds_write)
|
|
assert event.name == "alter-table"
|
|
assert event.database == "data"
|
|
assert event.table == "docs"
|
|
assert event.before_schema == before_schema
|
|
assert event.after_schema == data["schema"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_alter_table_dry_run(ds_write):
|
|
token = write_token(ds_write, permissions=["at"])
|
|
db = ds_write.get_database("data")
|
|
response = await ds_write.client.post(
|
|
"/data/docs/-/alter",
|
|
json={
|
|
"dry_run": True,
|
|
"operations": [
|
|
{"op": "add_column", "args": {"name": "slug", "type": "text"}}
|
|
],
|
|
},
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 200, response.text
|
|
data = response.json()
|
|
assert data["ok"] is True
|
|
assert data["dry_run"] is True
|
|
assert data["altered"] is True
|
|
assert data["operations_applied"] == 0
|
|
assert "slug" in data["schema"]
|
|
columns = (
|
|
await db.execute("select name from pragma_table_info('docs') order by cid")
|
|
).dicts()
|
|
assert [column["name"] for column in columns] == ["id", "title", "score", "age"]
|
|
assert last_event(ds_write) is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_alter_table_foreign_key_operations(ds_write):
|
|
token = write_token(ds_write, permissions=["at"])
|
|
db = ds_write.get_database("data")
|
|
await db.execute_write("create table owners (id integer primary key)")
|
|
await db.execute_write("create table categories (id integer primary key)")
|
|
|
|
response = await ds_write.client.post(
|
|
"/data/docs/-/alter",
|
|
json={
|
|
"operations": [
|
|
{"op": "add_column", "args": {"name": "owner_id", "type": "integer"}},
|
|
{
|
|
"op": "add_foreign_key",
|
|
"args": {"column": "owner_id", "fk_table": "owners"},
|
|
},
|
|
]
|
|
},
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 200, response.text
|
|
data = response.json()
|
|
assert data["operations_applied"] == 2
|
|
assert "[owner_id] INTEGER REFERENCES [owners]([id])" in data["schema"]
|
|
|
|
response = await ds_write.client.post(
|
|
"/data/docs/-/alter",
|
|
json={
|
|
"operations": [{"op": "drop_foreign_key", "args": {"column": "owner_id"}}]
|
|
},
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 200, response.text
|
|
data = response.json()
|
|
assert "[owner_id] INTEGER REFERENCES" not in data["schema"]
|
|
|
|
response = await ds_write.client.post(
|
|
"/data/docs/-/alter",
|
|
json={
|
|
"operations": [
|
|
{
|
|
"op": "set_foreign_keys",
|
|
"args": {
|
|
"foreign_keys": [
|
|
{
|
|
"column": "owner_id",
|
|
"fk_table": "categories",
|
|
"fk_column": "id",
|
|
}
|
|
]
|
|
},
|
|
}
|
|
]
|
|
},
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 200, response.text
|
|
data = response.json()
|
|
assert "[owner_id] INTEGER REFERENCES [categories]([id])" in data["schema"]
|
|
|
|
response = await ds_write.client.post(
|
|
"/data/docs/-/alter",
|
|
json={"operations": [{"op": "set_foreign_keys", "args": {"foreign_keys": []}}]},
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 200, response.text
|
|
data = response.json()
|
|
assert "[owner_id] INTEGER REFERENCES" not in data["schema"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_alter_table_foreign_key_requires_fk_table_for_fk_column(ds_write):
|
|
response = await ds_write.client.post(
|
|
"/data/docs/-/alter",
|
|
json={
|
|
"operations": [
|
|
{
|
|
"op": "add_foreign_key",
|
|
"args": {"column": "age", "fk_column": "id"},
|
|
}
|
|
]
|
|
},
|
|
headers=_headers(write_token(ds_write, permissions=["at"])),
|
|
)
|
|
assert response.status_code == 400
|
|
assert response.json() == {
|
|
"ok": False,
|
|
"errors": ["operations.0.add_foreign_key.args: fk_column requires fk_table"],
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_alter_table_foreign_key_without_fk_column_requires_single_pk(ds_write):
|
|
token = write_token(ds_write, permissions=["at"])
|
|
db = ds_write.get_database("data")
|
|
await db.execute_write(
|
|
"create table accounts (tenant_id integer, id integer, primary key (tenant_id, id))"
|
|
)
|
|
|
|
response = await ds_write.client.post(
|
|
"/data/docs/-/alter",
|
|
json={
|
|
"operations": [
|
|
{
|
|
"op": "add_foreign_key",
|
|
"args": {"column": "age", "fk_table": "accounts"},
|
|
}
|
|
]
|
|
},
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 400
|
|
assert response.json() == {
|
|
"ok": False,
|
|
"errors": ["Could not detect single primary key for table 'accounts'"],
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_foreign_key_suggestions(ds_write):
|
|
token = write_token(ds_write, permissions=["at"])
|
|
db = ds_write.get_database("data")
|
|
await db.execute_write("create table owners (id integer primary key)")
|
|
await db.execute_write("insert into owners (id) values (1), (2), (3)")
|
|
await db.execute_write("create table categories (slug text primary key)")
|
|
await db.execute_write("insert into categories (slug) values ('one'), ('two')")
|
|
await db.execute_write("create table numbers (id integer primary key)")
|
|
await db.execute_write("insert into numbers (id) values (10), (20)")
|
|
await db.execute_write("create table weights (id real primary key)")
|
|
await db.execute_write("insert into weights (id) values (1.5), (2.5)")
|
|
await db.execute_write(
|
|
"insert into docs (id, title, score, age) values "
|
|
"(1, 'one', 1.5, 1), (2, 'two', 999.5, 2), (3, null, null, null)"
|
|
)
|
|
|
|
response = await ds_write.client.get(
|
|
"/data/docs/-/foreign-key-suggestions",
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 200, response.text
|
|
data = response.json()
|
|
assert data["ok"] is True
|
|
assert data["database"] == "data"
|
|
assert data["table"] == "docs"
|
|
assert data["row_check"]["attempted"] is True
|
|
assert data["row_check"]["status"] == "completed"
|
|
assert data["row_check"]["row_limit"] == 500
|
|
assert data["row_check"]["sampled_rows"] == 3
|
|
|
|
columns = {column["column"]: column for column in data["columns"]}
|
|
assert columns["age"]["options"] == [
|
|
{"fk_table": "numbers", "fk_column": "id", "type": "INTEGER"},
|
|
{"fk_table": "owners", "fk_column": "id", "type": "INTEGER"},
|
|
]
|
|
assert columns["age"]["suggestions"] == [
|
|
{
|
|
"fk_table": "owners",
|
|
"fk_column": "id",
|
|
"confidence": "sampled",
|
|
"sampled_values": 2,
|
|
"reasons": ["type_match", "sample_values_exist"],
|
|
}
|
|
]
|
|
assert columns["title"]["options"] == [
|
|
{"fk_table": "categories", "fk_column": "slug", "type": "TEXT"}
|
|
]
|
|
assert columns["title"]["suggestions"][0]["fk_table"] == "categories"
|
|
assert columns["score"]["options"] == [
|
|
{"fk_table": "weights", "fk_column": "id", "type": "REAL"}
|
|
]
|
|
assert columns["score"]["suggestions"] == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_foreign_key_suggestions_permission_denied(ds_write):
|
|
token = write_token(ds_write, permissions=["ir"])
|
|
response = await ds_write.client.get(
|
|
"/data/docs/-/foreign-key-suggestions",
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 403
|
|
assert response.json() == {
|
|
"ok": False,
|
|
"errors": ["Permission denied: need alter-table"],
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_foreign_key_suggestions_fail_open(ds_write, monkeypatch):
|
|
token = write_token(ds_write, permissions=["at"])
|
|
db = ds_write.get_database("data")
|
|
await db.execute_write("create table owners (id integer primary key)")
|
|
|
|
async def raise_timeout(*args, **kwargs):
|
|
raise table_create_alter.ForeignKeySuggestionTimedOut
|
|
|
|
from datasette.views import table_create_alter
|
|
|
|
monkeypatch.setattr(
|
|
table_create_alter,
|
|
"_foreign_key_suggestion_samples",
|
|
raise_timeout,
|
|
)
|
|
|
|
response = await ds_write.client.get(
|
|
"/data/docs/-/foreign-key-suggestions",
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 200, response.text
|
|
data = response.json()
|
|
assert data["row_check"]["status"] == "timed_out"
|
|
columns = {column["column"]: column for column in data["columns"]}
|
|
assert columns["age"]["options"] == [
|
|
{"fk_table": "owners", "fk_column": "id", "type": "INTEGER"}
|
|
]
|
|
assert columns["age"]["suggestions"] == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_foreign_key_targets(ds_write):
|
|
token = write_token(ds_write, permissions=["ct"])
|
|
db = ds_write.get_database("data")
|
|
await db.execute_write("create table owners (id integer primary key)")
|
|
await db.execute_write("create table categories (slug varchar(30) primary key)")
|
|
await db.execute_write("create table blob_things (hash blob primary key)")
|
|
await db.execute_write(
|
|
"create table numeric_codes (code decimal(10,5) primary key)"
|
|
)
|
|
await db.execute_write(
|
|
'create table floating_point (value "FLOATING POINT" primary key)'
|
|
)
|
|
await db.execute_write(
|
|
"create table compound (a integer, b integer, primary key (a, b))"
|
|
)
|
|
await db.execute_write("create table no_pk (name text)")
|
|
|
|
response = await ds_write.client.get(
|
|
"/data/-/foreign-key-targets",
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 200, response.text
|
|
assert response.json() == {
|
|
"ok": True,
|
|
"database": "data",
|
|
"targets": [
|
|
{
|
|
"fk_table": "blob_things",
|
|
"fk_column": "hash",
|
|
"type": "blob",
|
|
},
|
|
{
|
|
"fk_table": "categories",
|
|
"fk_column": "slug",
|
|
"type": "text",
|
|
},
|
|
{
|
|
"fk_table": "docs",
|
|
"fk_column": "id",
|
|
"type": "integer",
|
|
},
|
|
{
|
|
"fk_table": "floating_point",
|
|
"fk_column": "value",
|
|
"type": "integer",
|
|
},
|
|
{
|
|
"fk_table": "numeric_codes",
|
|
"fk_column": "code",
|
|
"type": "numeric",
|
|
},
|
|
{
|
|
"fk_table": "owners",
|
|
"fk_column": "id",
|
|
"type": "integer",
|
|
},
|
|
],
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_foreign_key_targets_permission_denied(ds_write):
|
|
token = write_token(ds_write, permissions=["ir"])
|
|
response = await ds_write.client.get(
|
|
"/data/-/foreign-key-targets",
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 403
|
|
assert response.json() == {
|
|
"ok": False,
|
|
"errors": ["Permission denied: need create-table"],
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_alter_table_permission_denied(ds_write):
|
|
token = write_token(ds_write, permissions=["ir"])
|
|
response = await ds_write.client.post(
|
|
"/data/docs/-/alter",
|
|
json={"operations": [{"op": "add_column", "args": {"name": "slug"}}]},
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 403
|
|
assert response.json() == {
|
|
"ok": False,
|
|
"errors": ["Permission denied: need alter-table"],
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"body,expected_error",
|
|
(
|
|
(
|
|
{"operations": [{"op": "add_column", "args": {"type": "text"}}]},
|
|
"operations.0.add_column.args.name: Field required",
|
|
),
|
|
(
|
|
{
|
|
"operations": [
|
|
{"op": "add_column", "args": {"name": "x", "type": "bad"}}
|
|
]
|
|
},
|
|
"operations.0.add_column.args.type: Input should be 'text', 'integer', 'float' or 'blob'",
|
|
),
|
|
(
|
|
{
|
|
"operations": [
|
|
{
|
|
"op": "add_column",
|
|
"args": {
|
|
"name": "x",
|
|
"default_expr": "datetime('now')",
|
|
},
|
|
}
|
|
]
|
|
},
|
|
"operations.0.add_column.args.default_expr: Input should be 'current_timestamp', 'current_date' or 'current_time'",
|
|
),
|
|
(
|
|
{
|
|
"operations": [
|
|
{
|
|
"op": "add_column",
|
|
"args": {
|
|
"name": "x",
|
|
"default": "x",
|
|
"default_expr": "current_timestamp",
|
|
},
|
|
}
|
|
]
|
|
},
|
|
"operations.0.add_column.args: Value error, default and default_expr cannot both be provided",
|
|
),
|
|
),
|
|
)
|
|
async def test_alter_table_validation_errors(ds_write, body, expected_error):
|
|
response = await ds_write.client.post(
|
|
"/data/docs/-/alter",
|
|
json=body,
|
|
headers=_headers(write_token(ds_write, permissions=["at"])),
|
|
)
|
|
assert response.status_code == 400
|
|
assert response.json()["ok"] is False
|
|
assert response.json()["errors"] == [expected_error]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_write_form_parameter_called_sql():
|
|
ds = Datasette(memory=True, default_deny=True)
|
|
ds.root_enabled = True
|
|
db = ds.add_memory_database("execute_write_parameter_sql", name="data")
|
|
await db.execute_write("create table docs (id integer primary key, title text)")
|
|
await db.execute_write("insert into docs (id, title) values (1, 'Initial')")
|
|
await ds.invoke_startup()
|
|
|
|
form_response = await ds.client.get(
|
|
"/data/-/execute-write",
|
|
actor={"id": "root"},
|
|
params={"sql": "update docs set title = :sql where id = :id"},
|
|
)
|
|
assert form_response.status_code == 200
|
|
assert 'data-parameter-name-prefix="_sql_param_"' in form_response.text
|
|
assert '<label for="qp1">sql</label>' in form_response.text
|
|
assert 'name="_sql_param_sql"' in form_response.text
|
|
assert 'data-parameter-name="sql"' in form_response.text
|
|
assert 'name="_sql_param_id"' in form_response.text
|
|
|
|
response = await ds.client.post(
|
|
"/data/-/execute-write",
|
|
actor={"id": "root"},
|
|
data={
|
|
"sql": "update docs set title = :sql where id = :id",
|
|
"_sql_param_sql": "Updated",
|
|
"_sql_param_id": "1",
|
|
},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert "Query executed, 1 row affected" in response.text
|
|
assert (await db.execute("select title from docs where id = 1")).first()[
|
|
0
|
|
] == "Updated"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"input,expected_errors",
|
|
(
|
|
({"title": "New title"}, None),
|
|
({"title": None}, None),
|
|
({"score": 1.6}, None),
|
|
({"age": 10}, None),
|
|
({"title": "New title", "score": 1.6}, None),
|
|
({"title2": "New title"}, ["no such column: title2"]),
|
|
),
|
|
)
|
|
@pytest.mark.parametrize("use_return", (True, False))
|
|
async def test_update_row(ds_write, input, expected_errors, use_return):
|
|
token = write_token(ds_write)
|
|
pk = await _insert_row(ds_write)
|
|
|
|
path = "/data/docs/{}/-/update".format(pk)
|
|
|
|
data = {"update": input}
|
|
if use_return:
|
|
data["return"] = True
|
|
|
|
response = await ds_write.client.post(
|
|
path,
|
|
json=data,
|
|
headers=_headers(token),
|
|
)
|
|
if expected_errors:
|
|
assert response.status_code == 400
|
|
assert response.json()["ok"] is False
|
|
assert response.json()["errors"] == expected_errors
|
|
return
|
|
|
|
assert response.json()["ok"] is True
|
|
if not use_return:
|
|
assert "row" not in response.json()
|
|
else:
|
|
returned_row = response.json()["row"]
|
|
assert returned_row["id"] == pk
|
|
for k, v in input.items():
|
|
assert returned_row[k] == v
|
|
|
|
# Analytics event
|
|
event = last_event(ds_write)
|
|
assert event.actor == {"id": "root", "token": "dstok"}
|
|
assert event.database == "data"
|
|
assert event.table == "docs"
|
|
assert event.pks == [str(pk)]
|
|
|
|
# And fetch the row to check it's updated
|
|
response = await ds_write.client.get(
|
|
"/data/docs/{}.json?_shape=array".format(pk),
|
|
)
|
|
assert response.status_code == 200
|
|
row = response.json()[0]
|
|
assert row["id"] == pk
|
|
for k, v in input.items():
|
|
assert row[k] == v
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"scenario", ("no_token", "no_perm", "bad_table", "has_perm", "immutable")
|
|
)
|
|
async def test_drop_table(ds_write, scenario):
|
|
if scenario == "no_token":
|
|
token = "bad_token"
|
|
elif scenario == "no_perm":
|
|
token = write_token(ds_write, actor_id="not-root")
|
|
else:
|
|
token = write_token(ds_write)
|
|
should_work = scenario == "has_perm"
|
|
await ds_write.get_database("data").execute_write(
|
|
"insert into docs (id, title) values (1, 'Row 1')"
|
|
)
|
|
path = "/{database}/{table}/-/drop".format(
|
|
database="immutable" if scenario == "immutable" else "data",
|
|
table="docs" if scenario != "bad_table" else "bad_table",
|
|
)
|
|
response = await ds_write.client.post(
|
|
path,
|
|
headers=_headers(token),
|
|
)
|
|
if not should_work:
|
|
assert (
|
|
response.status_code == 403
|
|
if scenario in ("no_token", "bad_token")
|
|
else 404
|
|
)
|
|
assert response.json()["ok"] is False
|
|
expected_error = "Permission denied"
|
|
if scenario == "bad_table":
|
|
expected_error = "Table not found"
|
|
elif scenario == "immutable":
|
|
expected_error = "Database is immutable"
|
|
assert response.json()["errors"] == [expected_error]
|
|
assert (await ds_write.client.get("/data/docs")).status_code == 200
|
|
else:
|
|
# It should show a confirmation page
|
|
assert response.status_code == 200
|
|
assert response.json() == {
|
|
"ok": True,
|
|
"database": "data",
|
|
"table": "docs",
|
|
"row_count": 1,
|
|
"message": 'Pass "confirm": true to confirm',
|
|
}
|
|
assert (await ds_write.client.get("/data/docs")).status_code == 200
|
|
# Now send confirm: true
|
|
response2 = await ds_write.client.post(
|
|
path,
|
|
json={"confirm": True},
|
|
headers=_headers(token),
|
|
)
|
|
assert response2.json() == {"ok": True}
|
|
# Check event
|
|
event = last_event(ds_write)
|
|
assert event.name == "drop-table"
|
|
assert event.actor == {"id": "root", "token": "dstok"}
|
|
assert event.table == "docs"
|
|
assert event.database == "data"
|
|
# Table should 404
|
|
assert (await ds_write.client.get("/data/docs")).status_code == 404
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"input,expected_status,expected_response,expected_events",
|
|
(
|
|
# Permission error with a bad token
|
|
(
|
|
{"table": "bad", "row": {"id": 1}},
|
|
403,
|
|
{"ok": False, "errors": ["Permission denied"]},
|
|
[],
|
|
),
|
|
# Successful creation with columns:
|
|
(
|
|
{
|
|
"table": "one",
|
|
"columns": [
|
|
{
|
|
"name": "id",
|
|
"type": "integer",
|
|
},
|
|
{
|
|
"name": "title",
|
|
"type": "text",
|
|
},
|
|
{
|
|
"name": "score",
|
|
"type": "integer",
|
|
},
|
|
{
|
|
"name": "weight",
|
|
"type": "float",
|
|
},
|
|
{
|
|
"name": "thumbnail",
|
|
"type": "blob",
|
|
},
|
|
],
|
|
"pk": "id",
|
|
},
|
|
201,
|
|
{
|
|
"ok": True,
|
|
"database": "data",
|
|
"table": "one",
|
|
"table_url": "http://localhost/data/one",
|
|
"table_api_url": "http://localhost/data/one.json",
|
|
"schema": (
|
|
"CREATE TABLE [one] (\n"
|
|
" [id] INTEGER PRIMARY KEY,\n"
|
|
" [title] TEXT,\n"
|
|
" [score] INTEGER,\n"
|
|
" [weight] FLOAT,\n"
|
|
" [thumbnail] BLOB\n"
|
|
")"
|
|
),
|
|
},
|
|
["create-table"],
|
|
),
|
|
# Successful creation with rows:
|
|
(
|
|
{
|
|
"table": "two",
|
|
"rows": [
|
|
{
|
|
"id": 1,
|
|
"title": "Row 1",
|
|
"score": 1.5,
|
|
},
|
|
{
|
|
"id": 2,
|
|
"title": "Row 2",
|
|
"score": 1.5,
|
|
},
|
|
],
|
|
"pk": "id",
|
|
},
|
|
201,
|
|
{
|
|
"ok": True,
|
|
"database": "data",
|
|
"table": "two",
|
|
"table_url": "http://localhost/data/two",
|
|
"table_api_url": "http://localhost/data/two.json",
|
|
"schema": (
|
|
"CREATE TABLE [two] (\n"
|
|
" [id] INTEGER PRIMARY KEY,\n"
|
|
" [title] TEXT,\n"
|
|
" [score] FLOAT\n"
|
|
")"
|
|
),
|
|
"row_count": 2,
|
|
},
|
|
["create-table", "insert-rows"],
|
|
),
|
|
# Successful creation with row:
|
|
(
|
|
{
|
|
"table": "three",
|
|
"row": {
|
|
"id": 1,
|
|
"title": "Row 1",
|
|
"score": 1.5,
|
|
},
|
|
"pk": "id",
|
|
},
|
|
201,
|
|
{
|
|
"ok": True,
|
|
"database": "data",
|
|
"table": "three",
|
|
"table_url": "http://localhost/data/three",
|
|
"table_api_url": "http://localhost/data/three.json",
|
|
"schema": (
|
|
"CREATE TABLE [three] (\n"
|
|
" [id] INTEGER PRIMARY KEY,\n"
|
|
" [title] TEXT,\n"
|
|
" [score] FLOAT\n"
|
|
")"
|
|
),
|
|
"row_count": 1,
|
|
},
|
|
["create-table", "insert-rows"],
|
|
),
|
|
# Create with row and no primary key
|
|
(
|
|
{
|
|
"table": "four",
|
|
"row": {
|
|
"name": "Row 1",
|
|
},
|
|
},
|
|
201,
|
|
{
|
|
"ok": True,
|
|
"database": "data",
|
|
"table": "four",
|
|
"table_url": "http://localhost/data/four",
|
|
"table_api_url": "http://localhost/data/four.json",
|
|
"schema": ("CREATE TABLE [four] (\n" " [name] TEXT\n" ")"),
|
|
"row_count": 1,
|
|
},
|
|
["create-table", "insert-rows"],
|
|
),
|
|
# Create table with compound primary key
|
|
(
|
|
{
|
|
"table": "five",
|
|
"row": {"type": "article", "key": 123, "title": "Article 1"},
|
|
"pks": ["type", "key"],
|
|
},
|
|
201,
|
|
{
|
|
"ok": True,
|
|
"database": "data",
|
|
"table": "five",
|
|
"table_url": "http://localhost/data/five",
|
|
"table_api_url": "http://localhost/data/five.json",
|
|
"schema": (
|
|
"CREATE TABLE [five] (\n [type] TEXT,\n [key] INTEGER,\n"
|
|
" [title] TEXT,\n PRIMARY KEY ([type], [key])\n)"
|
|
),
|
|
"row_count": 1,
|
|
},
|
|
["create-table", "insert-rows"],
|
|
),
|
|
# Error: Table is required
|
|
(
|
|
{
|
|
"row": {"id": 1},
|
|
},
|
|
400,
|
|
{
|
|
"ok": False,
|
|
"errors": ["Table is required"],
|
|
},
|
|
[],
|
|
),
|
|
# Error: Invalid table name
|
|
(
|
|
{
|
|
"table": "sqlite_bad_name",
|
|
"row": {"id": 1},
|
|
},
|
|
400,
|
|
{
|
|
"ok": False,
|
|
"errors": ["Invalid table name"],
|
|
},
|
|
[],
|
|
),
|
|
# Error: JSON must be an object
|
|
(
|
|
[],
|
|
400,
|
|
{
|
|
"ok": False,
|
|
"errors": ["JSON must be an object"],
|
|
},
|
|
[],
|
|
),
|
|
# Error: Cannot specify columns with rows or row
|
|
(
|
|
{
|
|
"table": "bad",
|
|
"columns": [{"name": "id", "type": "integer"}],
|
|
"rows": [{"id": 1}],
|
|
},
|
|
400,
|
|
{
|
|
"ok": False,
|
|
"errors": ["Cannot specify columns with rows or row"],
|
|
},
|
|
[],
|
|
),
|
|
# Error: columns, rows or row is required
|
|
(
|
|
{
|
|
"table": "bad",
|
|
},
|
|
400,
|
|
{
|
|
"ok": False,
|
|
"errors": ["columns, rows or row is required"],
|
|
},
|
|
[],
|
|
),
|
|
# Error: columns must be a list
|
|
(
|
|
{
|
|
"table": "bad",
|
|
"columns": {"name": "id", "type": "integer"},
|
|
},
|
|
400,
|
|
{
|
|
"ok": False,
|
|
"errors": ["columns must be a list"],
|
|
},
|
|
[],
|
|
),
|
|
# Error: columns must be a list of objects
|
|
(
|
|
{
|
|
"table": "bad",
|
|
"columns": ["id"],
|
|
},
|
|
400,
|
|
{
|
|
"ok": False,
|
|
"errors": ["columns must be a list of objects"],
|
|
},
|
|
[],
|
|
),
|
|
# Error: Column name is required
|
|
(
|
|
{
|
|
"table": "bad",
|
|
"columns": [{"type": "integer"}],
|
|
},
|
|
400,
|
|
{
|
|
"ok": False,
|
|
"errors": ["Column name is required"],
|
|
},
|
|
[],
|
|
),
|
|
# Error: Unsupported column type
|
|
(
|
|
{
|
|
"table": "bad",
|
|
"columns": [{"name": "id", "type": "bad"}],
|
|
},
|
|
400,
|
|
{
|
|
"ok": False,
|
|
"errors": ["Unsupported column type: bad"],
|
|
},
|
|
[],
|
|
),
|
|
# Error: Duplicate column name
|
|
(
|
|
{
|
|
"table": "bad",
|
|
"columns": [
|
|
{"name": "id", "type": "integer"},
|
|
{"name": "id", "type": "integer"},
|
|
],
|
|
},
|
|
400,
|
|
{
|
|
"ok": False,
|
|
"errors": ["Duplicate column name: id"],
|
|
},
|
|
[],
|
|
),
|
|
# Error: rows must be a list
|
|
(
|
|
{
|
|
"table": "bad",
|
|
"rows": {"id": 1},
|
|
},
|
|
400,
|
|
{
|
|
"ok": False,
|
|
"errors": ["rows must be a list"],
|
|
},
|
|
[],
|
|
),
|
|
# Error: rows must be a list of objects
|
|
(
|
|
{
|
|
"table": "bad",
|
|
"rows": ["id"],
|
|
},
|
|
400,
|
|
{
|
|
"ok": False,
|
|
"errors": ["rows must be a list of objects"],
|
|
},
|
|
[],
|
|
),
|
|
# Error: pk must be a string
|
|
(
|
|
{
|
|
"table": "bad",
|
|
"row": {"id": 1},
|
|
"pk": 1,
|
|
},
|
|
400,
|
|
{
|
|
"ok": False,
|
|
"errors": ["pk must be a string"],
|
|
},
|
|
[],
|
|
),
|
|
# Error: Cannot specify both pk and pks
|
|
(
|
|
{
|
|
"table": "bad",
|
|
"row": {"id": 1, "name": "Row 1"},
|
|
"pk": "id",
|
|
"pks": ["id", "name"],
|
|
},
|
|
400,
|
|
{
|
|
"ok": False,
|
|
"errors": ["Cannot specify both pk and pks"],
|
|
},
|
|
[],
|
|
),
|
|
# Error: pks must be a list
|
|
(
|
|
{
|
|
"table": "bad",
|
|
"row": {"id": 1, "name": "Row 1"},
|
|
"pks": "id",
|
|
},
|
|
400,
|
|
{
|
|
"ok": False,
|
|
"errors": ["pks must be a list"],
|
|
},
|
|
[],
|
|
),
|
|
# Error: pks must be a list of strings
|
|
(
|
|
{"table": "bad", "row": {"id": 1, "name": "Row 1"}, "pks": [1, 2]},
|
|
400,
|
|
{"ok": False, "errors": ["pks must be a list of strings"]},
|
|
[],
|
|
),
|
|
# Error: ignore and replace are mutually exclusive
|
|
(
|
|
{
|
|
"table": "bad",
|
|
"row": {"id": 1, "name": "Row 1"},
|
|
"pk": "id",
|
|
"ignore": True,
|
|
"replace": True,
|
|
},
|
|
400,
|
|
{
|
|
"ok": False,
|
|
"errors": ["ignore and replace are mutually exclusive"],
|
|
},
|
|
[],
|
|
),
|
|
# ignore and replace require row or rows
|
|
(
|
|
{
|
|
"table": "bad",
|
|
"columns": [{"name": "id", "type": "integer"}],
|
|
"ignore": True,
|
|
},
|
|
400,
|
|
{
|
|
"ok": False,
|
|
"errors": ["ignore and replace require row or rows"],
|
|
},
|
|
[],
|
|
),
|
|
# ignore and replace require pk or pks
|
|
(
|
|
{
|
|
"table": "bad",
|
|
"row": {"id": 1},
|
|
"ignore": True,
|
|
},
|
|
400,
|
|
{
|
|
"ok": False,
|
|
"errors": ["ignore and replace require pk or pks"],
|
|
},
|
|
[],
|
|
),
|
|
(
|
|
{
|
|
"table": "bad",
|
|
"row": {"id": 1},
|
|
"replace": True,
|
|
},
|
|
400,
|
|
{
|
|
"ok": False,
|
|
"errors": ["ignore and replace require pk or pks"],
|
|
},
|
|
[],
|
|
),
|
|
),
|
|
)
|
|
async def test_create_table(
|
|
ds_write, input, expected_status, expected_response, expected_events
|
|
):
|
|
ds_write._tracked_events = []
|
|
# Special case for expected status of 403
|
|
if expected_status == 403:
|
|
token = "bad_token"
|
|
else:
|
|
token = write_token(ds_write)
|
|
response = await ds_write.client.post(
|
|
"/data/-/create",
|
|
json=input,
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == expected_status
|
|
data = response.json()
|
|
assert data == expected_response
|
|
# Should have tracked the expected events
|
|
events = ds_write._tracked_events
|
|
assert [e.name for e in events] == expected_events
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_table_with_foreign_key(ds_write):
|
|
token = write_token(ds_write)
|
|
response = await ds_write.client.post(
|
|
"/data/-/create",
|
|
json={
|
|
"table": "owners",
|
|
"columns": [
|
|
{"name": "id", "type": "integer"},
|
|
{"name": "name", "type": "text"},
|
|
],
|
|
"pk": "id",
|
|
},
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 201
|
|
|
|
response = await ds_write.client.post(
|
|
"/data/-/create",
|
|
json={
|
|
"table": "projects",
|
|
"columns": [
|
|
{"name": "id", "type": "integer"},
|
|
{
|
|
"name": "owner_id",
|
|
"type": "integer",
|
|
"fk_table": "owners",
|
|
},
|
|
{"name": "title", "type": "text"},
|
|
],
|
|
"pk": "id",
|
|
},
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
assert "[owner_id] INTEGER REFERENCES [owners]([id])" in data["schema"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"column,expected_error",
|
|
(
|
|
(
|
|
{"name": "owner_id", "type": "integer", "fk_table": "owners"},
|
|
None,
|
|
),
|
|
(
|
|
{"name": "owner_id", "type": "integer", "fk_column": "id"},
|
|
"columns.0: fk_column requires fk_table",
|
|
),
|
|
),
|
|
)
|
|
async def test_create_table_foreign_key_validation(ds_write, column, expected_error):
|
|
token = write_token(ds_write)
|
|
response = await ds_write.client.post(
|
|
"/data/-/create",
|
|
json={
|
|
"table": "projects",
|
|
"columns": [column],
|
|
},
|
|
headers=_headers(token),
|
|
)
|
|
if expected_error:
|
|
assert response.status_code == 400
|
|
assert response.json() == {"ok": False, "errors": [expected_error]}
|
|
else:
|
|
assert response.status_code == 400
|
|
assert response.json() == {
|
|
"ok": False,
|
|
"errors": ["Could not detect single primary key for table 'owners'"],
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_table_foreign_key_without_fk_column_requires_single_pk(ds_write):
|
|
token = write_token(ds_write)
|
|
response = await ds_write.client.post(
|
|
"/data/-/create",
|
|
json={
|
|
"table": "accounts",
|
|
"columns": [
|
|
{"name": "tenant_id", "type": "integer"},
|
|
{"name": "id", "type": "integer"},
|
|
{"name": "name", "type": "text"},
|
|
],
|
|
"pks": ["tenant_id", "id"],
|
|
},
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 201
|
|
|
|
response = await ds_write.client.post(
|
|
"/data/-/create",
|
|
json={
|
|
"table": "projects",
|
|
"columns": [
|
|
{"name": "id", "type": "integer"},
|
|
{
|
|
"name": "account_id",
|
|
"type": "integer",
|
|
"fk_table": "accounts",
|
|
},
|
|
],
|
|
"pk": "id",
|
|
},
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 400
|
|
assert response.json() == {
|
|
"ok": False,
|
|
"errors": ["Could not detect single primary key for table 'accounts'"],
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"permissions,body,expected_status,expected_errors",
|
|
(
|
|
(["create-table"], {"table": "t", "columns": [{"name": "c"}]}, 201, None),
|
|
# Need insert-row too if you use "rows":
|
|
(
|
|
["create-table"],
|
|
{"table": "t", "rows": [{"name": "c"}]},
|
|
403,
|
|
["Permission denied: need insert-row"],
|
|
),
|
|
# This should work:
|
|
(
|
|
["create-table", "insert-row"],
|
|
{"table": "t", "rows": [{"name": "c"}]},
|
|
201,
|
|
None,
|
|
),
|
|
# If you use replace: true you need update-row too:
|
|
(
|
|
["create-table", "insert-row"],
|
|
{"table": "t", "rows": [{"id": 1}], "pk": "id", "replace": True},
|
|
403,
|
|
["Permission denied: need update-row"],
|
|
),
|
|
),
|
|
)
|
|
async def test_create_table_permissions(
|
|
ds_write, permissions, body, expected_status, expected_errors
|
|
):
|
|
from datasette.tokens import TokenRestrictions
|
|
|
|
restrictions = TokenRestrictions()
|
|
for action in ["view-instance"] + permissions:
|
|
restrictions.allow_all(action)
|
|
token = await ds_write.create_token(
|
|
"root", handler="signed", restrictions=restrictions
|
|
)
|
|
response = await ds_write.client.post(
|
|
"/data/-/create",
|
|
json=body,
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == expected_status
|
|
if expected_errors:
|
|
data = response.json()
|
|
assert data["ok"] is False
|
|
assert data["errors"] == expected_errors
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.xfail(reason="Flaky, see https://github.com/simonw/datasette/issues/2356")
|
|
@pytest.mark.parametrize(
|
|
"input,expected_rows_after",
|
|
(
|
|
(
|
|
{
|
|
"table": "test_insert_replace",
|
|
"rows": [
|
|
{"id": 1, "name": "Row 1 new"},
|
|
{"id": 3, "name": "Row 3 new"},
|
|
],
|
|
"pk": "id",
|
|
"ignore": True,
|
|
},
|
|
[
|
|
{"id": 1, "name": "Row 1"},
|
|
{"id": 2, "name": "Row 2"},
|
|
{"id": 3, "name": "Row 3 new"},
|
|
],
|
|
),
|
|
(
|
|
{
|
|
"table": "test_insert_replace",
|
|
"rows": [
|
|
{"id": 1, "name": "Row 1 new"},
|
|
{"id": 3, "name": "Row 3 new"},
|
|
],
|
|
"pk": "id",
|
|
"replace": True,
|
|
},
|
|
[
|
|
{"id": 1, "name": "Row 1 new"},
|
|
{"id": 2, "name": "Row 2"},
|
|
{"id": 3, "name": "Row 3 new"},
|
|
],
|
|
),
|
|
),
|
|
)
|
|
async def test_create_table_ignore_replace(ds_write, input, expected_rows_after):
|
|
# Create table with two rows
|
|
token = write_token(ds_write)
|
|
first_response = await ds_write.client.post(
|
|
"/data/-/create",
|
|
json={
|
|
"rows": [{"id": 1, "name": "Row 1"}, {"id": 2, "name": "Row 2"}],
|
|
"table": "test_insert_replace",
|
|
"pk": "id",
|
|
},
|
|
headers=_headers(token),
|
|
)
|
|
assert first_response.status_code == 201
|
|
|
|
ds_write._tracked_events = []
|
|
|
|
# Try a second time
|
|
second_response = await ds_write.client.post(
|
|
"/data/-/create",
|
|
json=input,
|
|
headers=_headers(token),
|
|
)
|
|
assert second_response.status_code == 201
|
|
# Check that the rows are as expected
|
|
rows = await ds_write.client.get("/data/test_insert_replace.json?_shape=array")
|
|
assert rows.json() == expected_rows_after
|
|
|
|
# Check it fired the right events
|
|
event_names = [e.name for e in ds_write._tracked_events]
|
|
assert event_names == ["insert-rows"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_table_error_if_pk_changed(ds_write):
|
|
token = write_token(ds_write)
|
|
first_response = await ds_write.client.post(
|
|
"/data/-/create",
|
|
json={
|
|
"rows": [{"id": 1, "name": "Row 1"}, {"id": 2, "name": "Row 2"}],
|
|
"table": "test_insert_replace",
|
|
"pk": "id",
|
|
},
|
|
headers=_headers(token),
|
|
)
|
|
assert first_response.status_code == 201
|
|
# Try a second time with a different pk
|
|
second_response = await ds_write.client.post(
|
|
"/data/-/create",
|
|
json={
|
|
"rows": [{"id": 1, "name": "Row 1"}, {"id": 2, "name": "Row 2"}],
|
|
"table": "test_insert_replace",
|
|
"pk": "name",
|
|
"replace": True,
|
|
},
|
|
headers=_headers(token),
|
|
)
|
|
assert second_response.status_code == 400
|
|
assert second_response.json() == {
|
|
"ok": False,
|
|
"errors": ["pk cannot be changed for existing table"],
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_table_error_rows_twice_with_duplicates(ds_write):
|
|
# Error if you don't send ignore: True or replace: True
|
|
token = write_token(ds_write)
|
|
input = {
|
|
"rows": [{"id": 1, "name": "Row 1"}, {"id": 2, "name": "Row 2"}],
|
|
"table": "test_create_twice",
|
|
"pk": "id",
|
|
}
|
|
first_response = await ds_write.client.post(
|
|
"/data/-/create",
|
|
json=input,
|
|
headers=_headers(token),
|
|
)
|
|
assert first_response.status_code == 201
|
|
second_response = await ds_write.client.post(
|
|
"/data/-/create",
|
|
json=input,
|
|
headers=_headers(token),
|
|
)
|
|
assert second_response.status_code == 400
|
|
assert second_response.json() == {
|
|
"ok": False,
|
|
"errors": ["UNIQUE constraint failed: test_create_twice.id"],
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize(
|
|
"path",
|
|
(
|
|
"/data/-/create",
|
|
"/data/docs/-/drop",
|
|
"/data/docs/-/insert",
|
|
),
|
|
)
|
|
async def test_method_not_allowed(ds_write, path):
|
|
response = await ds_write.client.get(
|
|
path,
|
|
headers={
|
|
"Content-Type": "application/json",
|
|
},
|
|
)
|
|
assert response.status_code == 405
|
|
assert response.json() == {
|
|
"ok": False,
|
|
"error": "Method not allowed",
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_uses_alter_by_default_for_new_table(ds_write):
|
|
ds_write._tracked_events = []
|
|
token = write_token(ds_write)
|
|
response = await ds_write.client.post(
|
|
"/data/-/create",
|
|
json={
|
|
"table": "new_table",
|
|
"rows": [
|
|
{
|
|
"name": "Row 1",
|
|
}
|
|
]
|
|
* 100
|
|
+ [
|
|
{"name": "Row 2", "extra": "Extra"},
|
|
],
|
|
"pk": "id",
|
|
},
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 201
|
|
event_names = [e.name for e in ds_write._tracked_events]
|
|
assert event_names == ["create-table", "insert-rows"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("has_alter_permission", (True, False))
|
|
async def test_create_using_alter_against_existing_table(
|
|
ds_write, has_alter_permission
|
|
):
|
|
token = write_token(
|
|
ds_write, permissions=["ir", "ct"] + (["at"] if has_alter_permission else [])
|
|
)
|
|
# First create the table
|
|
response = await ds_write.client.post(
|
|
"/data/-/create",
|
|
json={
|
|
"table": "new_table",
|
|
"rows": [
|
|
{
|
|
"name": "Row 1",
|
|
}
|
|
],
|
|
"pk": "id",
|
|
},
|
|
headers=_headers(token),
|
|
)
|
|
assert response.status_code == 201
|
|
|
|
ds_write._tracked_events = []
|
|
# Now try to insert more rows using /-/create with alter=True
|
|
response2 = await ds_write.client.post(
|
|
"/data/-/create",
|
|
json={
|
|
"table": "new_table",
|
|
"rows": [{"name": "Row 2", "extra": "extra"}],
|
|
"pk": "id",
|
|
"alter": True,
|
|
},
|
|
headers=_headers(token),
|
|
)
|
|
if not has_alter_permission:
|
|
assert response2.status_code == 403
|
|
assert response2.json() == {
|
|
"ok": False,
|
|
"errors": ["Permission denied: need alter-table"],
|
|
}
|
|
else:
|
|
assert response2.status_code == 201
|
|
|
|
event_names = [e.name for e in ds_write._tracked_events]
|
|
assert event_names == ["alter-table", "insert-rows"]
|
|
|
|
# It should have altered the table
|
|
alter_event = ds_write._tracked_events[0]
|
|
assert alter_event.name == "alter-table"
|
|
assert "extra" not in alter_event.before_schema
|
|
assert "extra" in alter_event.after_schema
|
|
|
|
insert_rows_event = ds_write._tracked_events[1]
|
|
assert insert_rows_event.name == "insert-rows"
|
|
assert insert_rows_event.num_rows == 1
|