diff --git a/datasette/app.py b/datasette/app.py index 45d34991..60a20032 100644 --- a/datasette/app.py +++ b/datasette/app.py @@ -58,6 +58,9 @@ from .views.special import ( PermissionRulesView, PermissionCheckView, TablesView, + InstanceSchemaView, + DatabaseSchemaView, + TableSchemaView, ) from .views.table import ( TableInsertView, @@ -1910,6 +1913,10 @@ class Datasette: TablesView.as_view(self), r"/-/tables(\.(?Pjson))?$", ) + add_route( + InstanceSchemaView.as_view(self), + r"/-/schema(\.(?Pjson|md))?$", + ) add_route( LogoutView.as_view(self), r"/-/logout$", @@ -1951,6 +1958,10 @@ class Datasette: r"/(?P[^\/\.]+)(\.(?P\w+))?$", ) add_route(TableCreateView.as_view(self), r"/(?P[^\/\.]+)/-/create$") + add_route( + DatabaseSchemaView.as_view(self), + r"/(?P[^\/\.]+)/-/schema(\.(?Pjson|md))?$", + ) add_route( wrap_view(QueryView, self), r"/(?P[^\/\.]+)/-/query(\.(?P\w+))?$", @@ -1975,6 +1986,10 @@ class Datasette: TableDropView.as_view(self), r"/(?P[^\/\.]+)/(?P[^\/\.]+)/-/drop$", ) + add_route( + TableSchemaView.as_view(self), + r"/(?P[^\/\.]+)/(?P
[^\/\.]+)/-/schema(\.(?Pjson|md))?$", + ) add_route( RowDeleteView.as_view(self), r"/(?P[^\/\.]+)/(?P
[^/]+?)/(?P[^/]+?)/-/delete$", diff --git a/datasette/templates/database.html b/datasette/templates/database.html index 66f288dc..42b4ca0b 100644 --- a/datasette/templates/database.html +++ b/datasette/templates/database.html @@ -56,7 +56,7 @@ {% endif %} {% if tables %} -

Tables

+

Tables schema

{% endif %} {% for table in tables %} diff --git a/datasette/templates/schema.html b/datasette/templates/schema.html new file mode 100644 index 00000000..2fd8637e --- /dev/null +++ b/datasette/templates/schema.html @@ -0,0 +1,41 @@ +{% extends "base.html" %} + +{% block title %}{% if is_instance %}Schema for all databases{% elif table_name %}Schema for {{ schemas[0].database }}.{{ table_name }}{% else %}Schema for {{ schemas[0].database }}{% endif %}{% endblock %} + +{% block body_class %}schema{% endblock %} + +{% block crumbs %} +{% if is_instance %} +{{ crumbs.nav(request=request) }} +{% elif table_name %} +{{ crumbs.nav(request=request, database=schemas[0].database, table=table_name) }} +{% else %} +{{ crumbs.nav(request=request, database=schemas[0].database) }} +{% endif %} +{% endblock %} + +{% block content %} + + +{% for item in schemas %} + {% if is_instance %} +

{{ item.database }}

+ {% endif %} + + {% if item.schema %} +
{{ item.schema }}
+ {% else %} +

No schema available for this database.

+ {% endif %} + + {% if not loop.last %} +
+ {% endif %} +{% endfor %} + +{% if not schemas %} +

No databases with viewable schemas found.

+{% endif %} +{% endblock %} diff --git a/datasette/views/special.py b/datasette/views/special.py index a1d736c5..411363ec 100644 --- a/datasette/views/special.py +++ b/datasette/views/special.py @@ -761,8 +761,6 @@ class ApiExplorerView(BaseView): async def example_links(self, request): databases = [] for name, db in self.ds.databases.items(): - if name == "_internal": - continue database_visible, _ = await self.ds.check_visibility( request.actor, action="view-database", @@ -981,3 +979,180 @@ class TablesView(BaseView): ] return Response.json({"matches": matches, "truncated": truncated}) + + +class SchemaBaseView(BaseView): + """Base class for schema views with common response formatting.""" + + has_json_alternate = False + + async def get_database_schema(self, database_name): + """Get schema SQL for a database.""" + db = self.ds.databases[database_name] + result = await db.execute( + "select group_concat(sql, ';' || CHAR(10)) as schema from sqlite_master where sql is not null" + ) + row = result.first() + return row["schema"] if row and row["schema"] else "" + + def format_json_response(self, data): + """Format data as JSON response with CORS headers if needed.""" + headers = {} + if self.ds.cors: + add_cors_headers(headers) + return Response.json(data, headers=headers) + + def format_error_response(self, error_message, format_, status=404): + """Format error response based on requested format.""" + if format_ == "json": + headers = {} + if self.ds.cors: + add_cors_headers(headers) + return Response.json( + {"ok": False, "error": error_message}, status=status, headers=headers + ) + else: + return Response.text(error_message, status=status) + + def format_markdown_response(self, heading, schema): + """Format schema as Markdown response.""" + md_output = f"# {heading}\n\n```sql\n{schema}\n```\n" + return Response.text( + md_output, headers={"content-type": "text/markdown; charset=utf-8"} + ) + + async def format_html_response( + self, request, schemas, is_instance=False, table_name=None + ): + """Format schema as HTML response.""" + context = { + "schemas": schemas, + "is_instance": is_instance, + } + if table_name: + context["table_name"] = table_name + return await self.render(["schema.html"], request=request, context=context) + + +class InstanceSchemaView(SchemaBaseView): + """ + Displays schema for all databases in the instance. + Supports HTML, JSON, and Markdown formats. + """ + + name = "instance_schema" + + async def get(self, request): + format_ = request.url_vars.get("format") or "html" + + # Get all databases the actor can view + allowed_databases_page = await self.ds.allowed_resources( + "view-database", + request.actor, + ) + allowed_databases = [r.parent async for r in allowed_databases_page.all()] + + # Get schema for each database + schemas = [] + for database_name in allowed_databases: + schema = await self.get_database_schema(database_name) + schemas.append({"database": database_name, "schema": schema}) + + if format_ == "json": + return self.format_json_response({"schemas": schemas}) + elif format_ == "md": + md_parts = [ + f"# Schema for {item['database']}\n\n```sql\n{item['schema']}\n```" + for item in schemas + ] + return Response.text( + "\n\n".join(md_parts), + headers={"content-type": "text/markdown; charset=utf-8"}, + ) + else: + return await self.format_html_response(request, schemas, is_instance=True) + + +class DatabaseSchemaView(SchemaBaseView): + """ + Displays schema for a specific database. + Supports HTML, JSON, and Markdown formats. + """ + + name = "database_schema" + + async def get(self, request): + database_name = request.url_vars["database"] + format_ = request.url_vars.get("format") or "html" + + # Check if database exists + if database_name not in self.ds.databases: + return self.format_error_response("Database not found", format_) + + # Check view-database permission + await self.ds.ensure_permission( + action="view-database", + resource=DatabaseResource(database=database_name), + actor=request.actor, + ) + + schema = await self.get_database_schema(database_name) + + if format_ == "json": + return self.format_json_response( + {"database": database_name, "schema": schema} + ) + elif format_ == "md": + return self.format_markdown_response(f"Schema for {database_name}", schema) + else: + schemas = [{"database": database_name, "schema": schema}] + return await self.format_html_response(request, schemas) + + +class TableSchemaView(SchemaBaseView): + """ + Displays schema for a specific table. + Supports HTML, JSON, and Markdown formats. + """ + + name = "table_schema" + + async def get(self, request): + database_name = request.url_vars["database"] + table_name = request.url_vars["table"] + format_ = request.url_vars.get("format") or "html" + + # Check view-table permission + await self.ds.ensure_permission( + action="view-table", + resource=TableResource(database=database_name, table=table_name), + actor=request.actor, + ) + + # Get schema for the table + db = self.ds.databases[database_name] + result = await db.execute( + "select sql from sqlite_master where name = ? and sql is not null", + [table_name], + ) + row = result.first() + + # Return 404 if table doesn't exist + if not row or not row["sql"]: + return self.format_error_response("Table not found", format_) + + schema = row["sql"] + + if format_ == "json": + return self.format_json_response( + {"database": database_name, "table": table_name, "schema": schema} + ) + elif format_ == "md": + return self.format_markdown_response( + f"Schema for {database_name}.{table_name}", schema + ) + else: + schemas = [{"database": database_name, "schema": schema}] + return await self.format_html_response( + request, schemas, table_name=table_name + ) diff --git a/docs/pages.rst b/docs/pages.rst index 3d6530a3..2e54ce2f 100644 --- a/docs/pages.rst +++ b/docs/pages.rst @@ -107,3 +107,46 @@ Note that this URL includes the encoded primary key of the record. Here's that same page as JSON: `../people/uk~2Eorg~2Epublicwhip~2Fperson~2F10001.json `_ + + +.. _pages_schemas: + +Schemas +======= + +Datasette offers ``/-/schema`` endpoints to expose the SQL schema for databases and tables. + +.. _InstanceSchemaView: + +Instance schema +--------------- + +Access ``/-/schema`` to see the complete schema for all attached databases in the Datasette instance. + +Use ``/-/schema.md`` to get the same information as Markdown. + +Use ``/-/schema.json`` to get the same information as JSON, which looks like this: + +.. code-block:: json + + { + "schemas": [ + { + "database": "content", + "schema": "create table posts ..." + } + } + +.. _DatabaseSchemaView: + +Database schema +--------------- + +Use ``/database-name/-/schema`` to see the complete schema for a specific database. The ``.md`` and ``.json`` extensions work here too. The JSON returns an object with ``"database"`` and ``"schema"`` keys. + +.. _TableSchemaView: + +Table schema +------------ + +Use ``/database-name/table-name/-/schema`` to see the schema for a specific table. The ``.md`` and ``.json`` extensions work here too. The JSON returns an object with ``"database"``, ``"table"``, and ``"schema"`` keys. diff --git a/tests/test_html.py b/tests/test_html.py index dbe993c4..9997279b 100644 --- a/tests/test_html.py +++ b/tests/test_html.py @@ -142,7 +142,7 @@ async def test_database_page(ds_client): # And a list of tables for fragment in ( - '

Tables

', + '

Tables', '

sortable

', "

pk, foreign_key_with_label, foreign_key_with_blank_label, ", ): diff --git a/tests/test_schema_endpoints.py b/tests/test_schema_endpoints.py new file mode 100644 index 00000000..5500a7b0 --- /dev/null +++ b/tests/test_schema_endpoints.py @@ -0,0 +1,248 @@ +import asyncio +import pytest +import pytest_asyncio +from datasette.app import Datasette + + +@pytest_asyncio.fixture(scope="module") +async def schema_ds(): + """Create a Datasette instance with test databases and permission config.""" + ds = Datasette( + config={ + "databases": { + "schema_private_db": {"allow": {"id": "root"}}, + } + } + ) + + # Create public database with multiple tables + public_db = ds.add_memory_database("schema_public_db") + await public_db.execute_write( + "CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)" + ) + await public_db.execute_write( + "CREATE TABLE IF NOT EXISTS posts (id INTEGER PRIMARY KEY, title TEXT)" + ) + await public_db.execute_write( + "CREATE VIEW IF NOT EXISTS recent_posts AS SELECT * FROM posts ORDER BY id DESC" + ) + + # Create a database with restricted access (requires root permission) + private_db = ds.add_memory_database("schema_private_db") + await private_db.execute_write( + "CREATE TABLE IF NOT EXISTS secret_data (id INTEGER PRIMARY KEY, value TEXT)" + ) + + # Create an empty database + ds.add_memory_database("schema_empty_db") + + return ds + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "format_ext,expected_in_content", + [ + ("json", None), + ("md", ["# Schema for", "```sql"]), + ("", ["Schema for", "CREATE TABLE"]), + ], +) +async def test_database_schema_formats(schema_ds, format_ext, expected_in_content): + """Test /database/-/schema endpoint in different formats.""" + url = "/schema_public_db/-/schema" + if format_ext: + url += f".{format_ext}" + response = await schema_ds.client.get(url) + assert response.status_code == 200 + + if format_ext == "json": + data = response.json() + assert "database" in data + assert data["database"] == "schema_public_db" + assert "schema" in data + assert "CREATE TABLE users" in data["schema"] + else: + content = response.text + for expected in expected_in_content: + assert expected in content + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "format_ext,expected_in_content", + [ + ("json", None), + ("md", ["# Schema for", "```sql"]), + ("", ["Schema for all databases"]), + ], +) +async def test_instance_schema_formats(schema_ds, format_ext, expected_in_content): + """Test /-/schema endpoint in different formats.""" + url = "/-/schema" + if format_ext: + url += f".{format_ext}" + response = await schema_ds.client.get(url) + assert response.status_code == 200 + + if format_ext == "json": + data = response.json() + assert "schemas" in data + assert isinstance(data["schemas"], list) + db_names = [item["database"] for item in data["schemas"]] + # Should see schema_public_db and schema_empty_db, but not schema_private_db (anonymous user) + assert "schema_public_db" in db_names + assert "schema_empty_db" in db_names + assert "schema_private_db" not in db_names + # Check schemas are present + for item in data["schemas"]: + if item["database"] == "schema_public_db": + assert "CREATE TABLE users" in item["schema"] + else: + content = response.text + for expected in expected_in_content: + assert expected in content + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "format_ext,expected_in_content", + [ + ("json", None), + ("md", ["# Schema for", "```sql"]), + ("", ["Schema for users"]), + ], +) +async def test_table_schema_formats(schema_ds, format_ext, expected_in_content): + """Test /database/table/-/schema endpoint in different formats.""" + url = "/schema_public_db/users/-/schema" + if format_ext: + url += f".{format_ext}" + response = await schema_ds.client.get(url) + assert response.status_code == 200 + + if format_ext == "json": + data = response.json() + assert "database" in data + assert data["database"] == "schema_public_db" + assert "table" in data + assert data["table"] == "users" + assert "schema" in data + assert "CREATE TABLE users" in data["schema"] + else: + content = response.text + for expected in expected_in_content: + assert expected in content + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "url", + [ + "/schema_private_db/-/schema.json", + "/schema_private_db/secret_data/-/schema.json", + ], +) +async def test_schema_permission_enforcement(schema_ds, url): + """Test that permissions are enforced for schema endpoints.""" + # Anonymous user should get 403 + response = await schema_ds.client.get(url) + assert response.status_code == 403 + + # Authenticated user with permission should succeed + response = await schema_ds.client.get( + url, + cookies={"ds_actor": schema_ds.client.actor_cookie({"id": "root"})}, + ) + assert response.status_code == 200 + + +@pytest.mark.asyncio +async def test_instance_schema_respects_database_permissions(schema_ds): + """Test that /-/schema only shows databases the user can view.""" + # Anonymous user should only see public databases + response = await schema_ds.client.get("/-/schema.json") + assert response.status_code == 200 + data = response.json() + db_names = [item["database"] for item in data["schemas"]] + assert "schema_public_db" in db_names + assert "schema_empty_db" in db_names + assert "schema_private_db" not in db_names + + # Authenticated user should see all databases + response = await schema_ds.client.get( + "/-/schema.json", + cookies={"ds_actor": schema_ds.client.actor_cookie({"id": "root"})}, + ) + assert response.status_code == 200 + data = response.json() + db_names = [item["database"] for item in data["schemas"]] + assert "schema_public_db" in db_names + assert "schema_empty_db" in db_names + assert "schema_private_db" in db_names + + +@pytest.mark.asyncio +async def test_database_schema_with_multiple_tables(schema_ds): + """Test schema with multiple tables in a database.""" + response = await schema_ds.client.get("/schema_public_db/-/schema.json") + assert response.status_code == 200 + data = response.json() + schema = data["schema"] + + # All objects should be in the schema + assert "CREATE TABLE users" in schema + assert "CREATE TABLE posts" in schema + assert "CREATE VIEW recent_posts" in schema + + +@pytest.mark.asyncio +async def test_empty_database_schema(schema_ds): + """Test schema for an empty database.""" + response = await schema_ds.client.get("/schema_empty_db/-/schema.json") + assert response.status_code == 200 + data = response.json() + assert data["database"] == "schema_empty_db" + assert data["schema"] == "" + + +@pytest.mark.asyncio +async def test_database_not_exists(schema_ds): + """Test schema for a non-existent database returns 404.""" + # Test JSON format + response = await schema_ds.client.get("/nonexistent_db/-/schema.json") + assert response.status_code == 404 + data = response.json() + assert data["ok"] is False + assert "not found" in data["error"].lower() + + # Test HTML format (returns text) + response = await schema_ds.client.get("/nonexistent_db/-/schema") + assert response.status_code == 404 + assert "not found" in response.text.lower() + + # Test Markdown format (returns text) + response = await schema_ds.client.get("/nonexistent_db/-/schema.md") + assert response.status_code == 404 + assert "not found" in response.text.lower() + + +@pytest.mark.asyncio +async def test_table_not_exists(schema_ds): + """Test schema for a non-existent table returns 404.""" + # Test JSON format + response = await schema_ds.client.get("/schema_public_db/nonexistent/-/schema.json") + assert response.status_code == 404 + data = response.json() + assert data["ok"] is False + assert "not found" in data["error"].lower() + + # Test HTML format (returns text) + response = await schema_ds.client.get("/schema_public_db/nonexistent/-/schema") + assert response.status_code == 404 + assert "not found" in response.text.lower() + + # Test Markdown format (returns text) + response = await schema_ds.client.get("/schema_public_db/nonexistent/-/schema.md") + assert response.status_code == 404 + assert "not found" in response.text.lower()