mirror of
https://github.com/simonw/datasette.git
synced 2025-12-10 16:51:24 +01:00
/db/table/pk/-/update endpoint, closes #1863
This commit is contained in:
parent
0fe1619910
commit
484bef0d3b
6 changed files with 269 additions and 43 deletions
|
|
@ -14,7 +14,7 @@ def ds_write(tmp_path_factory):
|
|||
for db in (db1, db2):
|
||||
db.execute("vacuum")
|
||||
db.execute(
|
||||
"create table docs (id integer primary key, title text, score float)"
|
||||
"create table docs (id integer primary key, title text, score float, age integer)"
|
||||
)
|
||||
ds = Datasette([db_path], immutables=[db_path_immutable])
|
||||
yield ds
|
||||
|
|
@ -34,13 +34,13 @@ async def test_write_row(ds_write):
|
|||
token = write_token(ds_write)
|
||||
response = await ds_write.client.post(
|
||||
"/data/docs/-/insert",
|
||||
json={"row": {"title": "Test", "score": 1.0}},
|
||||
json={"row": {"title": "Test", "score": 1.2, "age": 5}},
|
||||
headers={
|
||||
"Authorization": "Bearer {}".format(token),
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
expected_row = {"id": 1, "title": "Test", "score": 1.0}
|
||||
expected_row = {"id": 1, "title": "Test", "score": 1.2, "age": 5}
|
||||
assert response.status_code == 201
|
||||
assert response.json()["rows"] == [expected_row]
|
||||
rows = (await ds_write.get_database("data").execute("select * from docs")).rows
|
||||
|
|
@ -51,7 +51,11 @@ async def test_write_row(ds_write):
|
|||
@pytest.mark.parametrize("return_rows", (True, False))
|
||||
async def test_write_rows(ds_write, return_rows):
|
||||
token = write_token(ds_write)
|
||||
data = {"rows": [{"title": "Test {}".format(i), "score": 1.0} for i in range(20)]}
|
||||
data = {
|
||||
"rows": [
|
||||
{"title": "Test {}".format(i), "score": 1.0, "age": 5} for i in range(20)
|
||||
]
|
||||
}
|
||||
if return_rows:
|
||||
data["return"] = True
|
||||
response = await ds_write.client.post(
|
||||
|
|
@ -71,7 +75,8 @@ async def test_write_rows(ds_write, return_rows):
|
|||
]
|
||||
assert len(actual_rows) == 20
|
||||
assert actual_rows == [
|
||||
{"id": i + 1, "title": "Test {}".format(i), "score": 1.0} for i in range(20)
|
||||
{"id": i + 1, "title": "Test {}".format(i), "score": 1.0, "age": 5}
|
||||
for i in range(20)
|
||||
]
|
||||
assert response.json()["ok"] is True
|
||||
if return_rows:
|
||||
|
|
@ -241,14 +246,14 @@ async def test_write_row_errors(
|
|||
True,
|
||||
False,
|
||||
[
|
||||
{"id": 1, "title": "Exists", "score": None},
|
||||
{"id": 1, "title": "Exists", "score": None, "age": None},
|
||||
],
|
||||
),
|
||||
(
|
||||
False,
|
||||
True,
|
||||
[
|
||||
{"id": 1, "title": "One", "score": None},
|
||||
{"id": 1, "title": "One", "score": None, "age": None},
|
||||
],
|
||||
),
|
||||
),
|
||||
|
|
@ -289,6 +294,19 @@ async def test_insert_ignore_replace(
|
|||
assert response.json()["rows"] == expected_rows
|
||||
|
||||
|
||||
async def _insert_row(ds):
|
||||
insert_response = await ds.client.post(
|
||||
"/data/docs/-/insert",
|
||||
json={"row": {"title": "Row one", "score": 1.2, "age": 5}, "return": True},
|
||||
headers={
|
||||
"Authorization": "Bearer {}".format(write_token(ds)),
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
assert insert_response.status_code == 201
|
||||
return insert_response.json()["rows"][0]["id"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("scenario", ("no_token", "no_perm", "bad_table", "has_perm"))
|
||||
async def test_delete_row(ds_write, scenario):
|
||||
|
|
@ -300,17 +318,7 @@ async def test_delete_row(ds_write, scenario):
|
|||
token = write_token(ds_write)
|
||||
should_work = scenario == "has_perm"
|
||||
|
||||
# Insert a row
|
||||
insert_response = await ds_write.client.post(
|
||||
"/data/docs/-/insert",
|
||||
json={"row": {"title": "Row one", "score": 1.0}, "return": True},
|
||||
headers={
|
||||
"Authorization": "Bearer {}".format(write_token(ds_write)),
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
assert insert_response.status_code == 201
|
||||
pk = insert_response.json()["rows"][0]["id"]
|
||||
pk = await _insert_row(ds_write)
|
||||
|
||||
path = "/data/{}/{}/-/delete".format(
|
||||
"docs" if scenario != "bad_table" else "bad_table", pk
|
||||
|
|
@ -343,6 +351,96 @@ async def test_delete_row(ds_write, scenario):
|
|||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("scenario", ("no_token", "no_perm", "bad_table"))
|
||||
async def test_update_row_check_permission(ds_write, scenario):
|
||||
if scenario == "no_token":
|
||||
token = "bad_token"
|
||||
elif scenario == "no_perm":
|
||||
token = write_token(ds_write, actor_id="not-root")
|
||||
else:
|
||||
token = write_token(ds_write)
|
||||
|
||||
pk = await _insert_row(ds_write)
|
||||
|
||||
path = "/data/{}/{}/-/delete".format(
|
||||
"docs" if scenario != "bad_table" else "bad_table", pk
|
||||
)
|
||||
|
||||
response = await ds_write.client.post(
|
||||
path,
|
||||
json={"update": {"title": "New title"}},
|
||||
headers={
|
||||
"Authorization": "Bearer {}".format(token),
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 403 if scenario in ("no_token", "bad_token") else 404
|
||||
assert response.json()["ok"] is False
|
||||
assert (
|
||||
response.json()["errors"] == ["Permission denied"]
|
||||
if scenario == "no_token"
|
||||
else ["Table not found: bad_table"]
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"input,expected_errors",
|
||||
(
|
||||
({"title": "New title"}, None),
|
||||
({"title": None}, None),
|
||||
({"score": 1.6}, None),
|
||||
({"age": 10}, None),
|
||||
({"title": "New title", "score": 1.6}, None),
|
||||
({"title2": "New title"}, ["no such column: title2"]),
|
||||
),
|
||||
)
|
||||
@pytest.mark.parametrize("use_return", (True, False))
|
||||
async def test_update_row(ds_write, input, expected_errors, use_return):
|
||||
token = write_token(ds_write)
|
||||
pk = await _insert_row(ds_write)
|
||||
|
||||
path = "/data/docs/{}/-/update".format(pk)
|
||||
|
||||
data = {"update": input}
|
||||
if use_return:
|
||||
data["return"] = True
|
||||
|
||||
response = await ds_write.client.post(
|
||||
path,
|
||||
json=data,
|
||||
headers={
|
||||
"Authorization": "Bearer {}".format(token),
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
if expected_errors:
|
||||
assert response.status_code == 400
|
||||
assert response.json()["ok"] is False
|
||||
assert response.json()["errors"] == expected_errors
|
||||
return
|
||||
|
||||
assert response.json()["ok"] is True
|
||||
if not use_return:
|
||||
assert "row" not in response.json()
|
||||
else:
|
||||
returned_row = response.json()["row"]
|
||||
assert returned_row["id"] == pk
|
||||
for k, v in input.items():
|
||||
assert returned_row[k] == v
|
||||
|
||||
# And fetch the row to check it's updated
|
||||
response = await ds_write.client.get(
|
||||
"/data/docs/{}.json?_shape=array".format(pk),
|
||||
)
|
||||
assert response.status_code == 200
|
||||
row = response.json()[0]
|
||||
assert row["id"] == pk
|
||||
for k, v in input.items():
|
||||
assert row[k] == v
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"scenario", ("no_token", "no_perm", "bad_table", "has_perm", "immutable")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue