/db/-/create API endpoint, closes #1882

This commit is contained in:
Simon Willison 2022-11-14 21:57:28 -08:00
commit 52bf222d48

View file

@ -704,3 +704,132 @@ async def _table_columns(datasette, database_name):
for row in result.rows:
table_columns.setdefault(row["table_name"], []).append(row["name"])
return table_columns
class TableCreateView(BaseView):
name = "table-create"
_valid_keys = {"table", "rows", "row", "columns", "pk"}
_supported_column_types = {
"text",
"integer",
"float",
"blob",
}
# Any string that does not contain a newline or start with sqlite_
_table_name_re = re.compile(r"^(?!sqlite_)[^\n]+$")
def __init__(self, datasette):
self.ds = datasette
async def post(self, request):
database_route = tilde_decode(request.url_vars["database"])
try:
db = self.ds.get_database(route=database_route)
except KeyError:
return _error(["Database not found: {}".format(database_route)], 404)
database_name = db.name
# Must have create-table permission
if not await self.ds.permission_allowed(
request.actor, "create-table", resource=database_name
):
return _error(["Permission denied"], 403)
body = await request.post_body()
try:
data = json.loads(body)
except json.JSONDecodeError as e:
return _error(["Invalid JSON: {}".format(e)])
if not isinstance(data, dict):
return _error(["JSON must be an object"])
invalid_keys = set(data.keys()) - self._valid_keys
if invalid_keys:
return _error(["Invalid keys: {}".format(", ".join(invalid_keys))])
table_name = data.get("table")
if not table_name:
return _error(["Table is required"])
if not self._table_name_re.match(table_name):
return _error(["Invalid table name"])
columns = data.get("columns")
rows = data.get("rows")
row = data.get("row")
if not columns and not rows and not row:
return _error(["columns, rows or row is required"])
if rows and row:
return _error(["Cannot specify both rows and row"])
if columns:
if rows or row:
return _error(["Cannot specify columns with rows or row"])
if not isinstance(columns, list):
return _error(["columns must be a list"])
for column in columns:
if not isinstance(column, dict):
return _error(["columns must be a list of objects"])
if not column.get("name") or not isinstance(column.get("name"), str):
return _error(["Column name is required"])
if not column.get("type"):
column["type"] = "text"
if column["type"] not in self._supported_column_types:
return _error(
["Unsupported column type: {}".format(column["type"])]
)
# No duplicate column names
dupes = {c["name"] for c in columns if columns.count(c) > 1}
if dupes:
return _error(["Duplicate column name: {}".format(", ".join(dupes))])
if row:
rows = [row]
if rows:
if not isinstance(rows, list):
return _error(["rows must be a list"])
for row in rows:
if not isinstance(row, dict):
return _error(["rows must be a list of objects"])
pk = data.get("pk")
if pk:
if not isinstance(pk, str):
return _error(["pk must be a string"])
def create_table(conn):
table = sqlite_utils.Database(conn)[table_name]
if rows:
table.insert_all(rows, pk=pk)
else:
table.create(
{c["name"]: c["type"] for c in columns},
pk=pk,
)
return table.schema
try:
schema = await db.execute_write_fn(create_table)
except Exception as e:
return _error([str(e)])
table_url = self.ds.absolute_url(
request, self.ds.urls.table(db.name, table_name)
)
table_api_url = self.ds.absolute_url(
request, self.ds.urls.table(db.name, table_name, format="json")
)
details = {
"ok": True,
"database": db.name,
"table": table_name,
"table_url": table_url,
"table_api_url": table_api_url,
"schema": schema,
}
if rows:
details["row_count"] = len(rows)
return Response.json(details, status=201)